diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 8f83e21ce4ee845c49084e86838a3a35c95d7592..166a363adaa3c1814b42d5fd49e2881b964dd3e2 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -144,6 +144,7 @@ typedef struct SSyncLogStore { // refactor, log[0 .. n] ==> log[m .. n] int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex); + int32_t (*syncLogResetBeginIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore); bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore); diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index e0cbcf074458f90e861a27ca4a0a1c026ed0fa03..9f03ac3e558af787d2f401a0864213fc8a002ef6 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -49,8 +49,8 @@ void raftStoreClearVote(SRaftStore *pRaftStore); void raftStoreNextTerm(SRaftStore *pRaftStore); void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson); -cJSON * raftStore2Json(SRaftStore *pRaftStore); -char * raftStore2Str(SRaftStore *pRaftStore); +cJSON *raftStore2Json(SRaftStore *pRaftStore); +char *raftStore2Str(SRaftStore *pRaftStore); // for debug ------------------- void raftStorePrint(SRaftStore *pObj); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 6d0971a1fa03ecd03a6fe68cc95512e8f6a5fe83..6f725e123e225c6d0f4b6cfbe69027cb5e53223c 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -469,6 +469,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); + sInfo("syncNodeMakeLogSame, from %ld to %ld", delBegin, delEnd); + logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); return code; } @@ -495,19 +497,30 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { // prevLogIndex == -1 static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) { if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) { + sTrace("syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld", pMsg->prevLogIndex); return true; } SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); if (pMsg->prevLogIndex > myLastIndex) { + sTrace("syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld", pMsg->prevLogIndex, + myLastIndex); return false; } SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1); if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) { + sTrace( + "syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, " + "myPreLogTerm:%lu", + pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm); return true; } + sTrace( + "syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, " + "myPreLogTerm:%lu", + pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm); return false; } @@ -517,7 +530,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // print log char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm); + snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, vgId:%d, term:%lu", ths->vgId, + ths->pRaftStore->currentTerm); syncAppendEntriesLog2(logBuf, pMsg); // if I am standby, to be added into a raft group, I should process SyncAppendEntries msg @@ -551,6 +565,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs do { bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; if (condition) { + sTrace("recv SyncAppendEntries, candidate to follower"); + syncNodeBecomeFollower(ths); // do not reply? return ret; @@ -586,6 +602,12 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs bool condition = condition1 || condition2 || condition3; if (condition) { + sTrace( + "recv SyncAppendEntries, fake match, myLastIndex:%ld, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, " + "condition1:%d, condition2:%d, condition3:%d", + myLastIndex, ths->pLogStore->syncLogBeginIndex(ths->pLogStore), + ths->pLogStore->syncLogEndIndex(ths->pLogStore), condition1, condition2, condition3); + // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); pReply->srcId = ths->myRaftId; @@ -623,6 +645,12 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs bool condition = condition1 || condition2; if (condition) { + sTrace( + "recv SyncAppendEntries, not match, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, condition1:%d, " + "condition2:%d, logOK:%d", + ths->pLogStore->syncLogBeginIndex(ths->pLogStore), ths->pLogStore->syncLogEndIndex(ths->pLogStore), + condition1, condition2, logOK); + // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); pReply->srcId = ths->myRaftId; @@ -660,6 +688,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs // has entries in SyncAppendEntries msg bool hasAppendEntries = pMsg->dataLen > 0; + sTrace("recv SyncAppendEntries, match, myLastIndex:%ld, hasExtraEntries:%d, hasAppendEntries:%d", myLastIndex, + hasExtraEntries, hasAppendEntries); + if (hasExtraEntries) { // make log same, rollback deleted entries code = syncNodeMakeLogSame(ths, pMsg); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 7feb7d293ffb734e286e2d1b1adb2ceb1580d903..d7a8828d94e81f60a61bac74969e637dadfcfe0c 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -56,6 +56,8 @@ static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex b return 0; } +int32_t raftLogResetBeginIndex(struct SSyncLogStore* pLogStore) { return 0; } + static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal;