提交 44994201 编写于 作者: M Minghao Li

fix(sync): sending snapshot

上级 ac90f61b
......@@ -39,6 +39,8 @@ char* logStore2Str(SSyncLogStore* pLogStore);
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore);
char* logStoreSimple2Str(SSyncLogStore* pLogStore);
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore);
// SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
// SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
// SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
......
......@@ -454,16 +454,16 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex localPreLogIndex;
SyncTerm localPreLogTerm;
ret = syncNodeGetPreIndexTerm(ths, pMsg->prevLogIndex + 1, &localPreLogIndex, &localPreLogTerm);
ASSERT(ret == 0);
SyncIndex localLastIndex;
SyncTerm localLastTerm;
ret = syncNodeGetLastIndexTerm(ths, &localLastIndex, &localLastTerm);
ASSERT(ret == 0);
bool logOK;
if (syncNodeIsIndexInSnapshot(ths, pMsg->prevLogIndex)) {
SyncIndex logFirstIndex = logStoreFirstIndex(ths->pLogStore);
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (logFirstIndex > snapshot.lastApplyIndex) {
logOK = false;
} else if (syncNodeIsIndexInSnapshot(ths, pMsg->prevLogIndex)) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
......@@ -478,6 +478,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
logOK, pMsg->prevLogIndex, snapshot.lastApplyIndex, pMsg->prevLogTerm, snapshot.lastApplyTerm);
} else {
ret = syncNodeGetPreIndexTerm(ths, pMsg->prevLogIndex + 1, &localPreLogIndex, &localPreLogTerm);
ASSERT(ret == 0);
logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) &&
......@@ -510,6 +513,21 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
return ret;
}
SyncIndex localLastIndex;
SyncTerm localLastTerm;
if (logFirstIndex == SYNC_INDEX_INVALID) {
localLastIndex = ths->pLogStore->getLastIndex(ths->pLogStore);
localLastTerm = ths->pLogStore->getLastTerm(ths->pLogStore);
} else if (logFirstIndex > snapshot.lastApplyIndex) {
localLastIndex = ths->pLogStore->getLastIndex(ths->pLogStore);
localLastTerm = ths->pLogStore->getLastTerm(ths->pLogStore);
} else {
ret = syncNodeGetLastIndexTerm(ths, &localLastIndex, &localLastTerm);
ASSERT(ret == 0);
}
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
sTrace(
......
......@@ -258,6 +258,12 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore) {
return serialized;
}
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
return walGetFirstVer(pWal);
}
// for debug -----------------
void logStorePrint(SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
......
......@@ -151,8 +151,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
if ((syncNodeIsIndexInSnapshot(pSyncNode, nextIndex - 1) && !snapshotSendingFinish) ||
syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) {
// will send this msg until snapshot receive finish!
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
SSnapshot snapshot = pSender->snapshot;
sInfo("nextIndex:%ld in snapshot: <lastApplyIndex:%ld, lastApplyTerm:%lu>, begin snapshot", nextIndex,
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
......@@ -162,6 +162,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
preLogIndex = snapshot.lastApplyIndex;
preLogTerm = snapshot.lastApplyTerm;
// update next index!
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, snapshot.lastApplyIndex + 1);
// to claim leader
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
assert(pMsg != NULL);
......
......@@ -41,6 +41,8 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->finish = false;
} else {
sInfo("snapshotSenderCreate cannot create sender");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册