提交 8680da28 编写于 作者: M Minghao Li

fix(sync): snapshot maybe change when sending

上级 24a6b352
......@@ -687,7 +687,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
code = syncNodeCommit(ths, beginIndex, endIndex, 0x11);
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
ASSERT(code == 0);
}
}
......
......@@ -178,9 +178,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) {
// already start
sentryIndex = pSender->snapshot.lastApplyIndex;
sTrace("sending snapshot already start: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu", pSender->term,
ths->pRaftStore->currentTerm);
} else {
// start send snapshot, first time
sTrace("sending snapshot start first: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu", pSender->term,
ths->pRaftStore->currentTerm);
snapshotSenderDoStart(pSender);
pSender->start = true;
sentryIndex = pSender->snapshot.lastApplyIndex;
......
......@@ -94,7 +94,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// execute fsm
if (pSyncNode->pFsm != NULL) {
int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, 0x1);
int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state);
ASSERT(code == 0);
#if 0
......
......@@ -1558,7 +1558,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
assert(pEntry != NULL);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
// ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
syncNodeReplicate(ths);
}
......@@ -1620,7 +1621,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
assert(pEntry != NULL);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
// ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
// start replicate right now!
syncNodeReplicate(ths);
......@@ -1692,8 +1694,9 @@ const char* syncStr(ESyncState state) {
}
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0;
sInfo("sync commit from %ld to %ld, flag:0x%lX", beginIndex, endIndex, flag);
int32_t code = 0;
ESyncState state = flag;
sInfo("sync event commit from %ld to %ld, %s", beginIndex, endIndex, syncUtilState2String(state));
// maybe execute by leader, skip snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
......
......@@ -156,6 +156,9 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
}
walFsync(pWal, true);
sTrace("sync event write wal: %ld", pEntry->index);
return code;
}
......@@ -309,6 +312,8 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
// assert(code == 0);
walFsync(pWal, true);
sTrace("sync event old write wal: %ld", pEntry->index);
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册