From aea4e0f4722ebe43e703e2f820944a75b766f1b1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 18 Mar 2022 19:09:22 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/src/syncAppendEntries.c | 5 ++++- source/libs/sync/src/syncAppendEntriesReply.c | 5 ++++- source/libs/sync/src/syncCommit.c | 5 +++-- source/libs/sync/src/syncRequestVote.c | 5 ++++- source/libs/sync/src/syncRequestVoteReply.c | 5 ++++- source/libs/sync/test/syncElectTest.cpp | 5 ++--- source/libs/sync/test/syncReplicateTest.cpp | 10 +++++----- 7 files changed, 26 insertions(+), 14 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 270180e347..e4df93ca47 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -87,7 +87,10 @@ // int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t ret = 0; - syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg); + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm); + syncAppendEntriesLog2(logBuf, pMsg); if (pMsg->term > ths->pRaftStore->currentTerm) { syncNodeUpdateTerm(ths, pMsg->term); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 9db9a3e8ac..817974fd26 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -37,7 +37,10 @@ // int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; - syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg); + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplyCb== term:%lu", ths->pRaftStore->currentTerm); + syncAppendEntriesReplyLog2(logBuf, pMsg); if (pMsg->term < ths->pRaftStore->currentTerm) { sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 8c6071c621..0d4df8e6cf 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -50,9 +50,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // update commit index SyncIndex newCommitIndex = pSyncNode->commitIndex; for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex; - ++index) { + --index) { bool agree = syncAgree(pSyncNode, index); - sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld", agree, index); + sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld", agree, index, + pSyncNode->commitIndex); if (agree) { // term SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 062b0244bd..e23748a81e 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -43,7 +43,10 @@ // int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { int32_t ret = 0; - syncRequestVoteLog2("==syncNodeOnRequestVoteCb==", pMsg); + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteCb== term:%lu", ths->pRaftStore->currentTerm); + syncRequestVoteLog2(logBuf, pMsg); if (pMsg->term > ths->pRaftStore->currentTerm) { syncNodeUpdateTerm(ths, pMsg->term); diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 12603bb337..1531f701ff 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -38,7 +38,10 @@ // int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t ret = 0; - syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg); + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplyCb== term:%lu", ths->pRaftStore->currentTerm); + syncRequestVoteReplyLog2(logBuf, pMsg); if (pMsg->term < ths->pRaftStore->currentTerm) { sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index 502263cfbf..0e0b57a025 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -116,10 +116,9 @@ int main(int argc, char** argv) { //--------------------------- while (1) { - sTrace("while 1 sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", - gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock, + sTrace("elect sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", + gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); - taosMsleep(1000); } return 0; diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index a9a3942d69..6c4fab2425 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -119,7 +119,7 @@ SRpcMsg *step0(int i) { SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); memset(pMsg, 0, sizeof(SRpcMsg)); pMsg->msgType = 9999; - pMsg->contLen = 32; + pMsg->contLen = 128; pMsg->pCont = malloc(pMsg->contLen); snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i); return pMsg; @@ -172,14 +172,14 @@ int main(int argc, char **argv) { gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); taosMsleep(1000); - sTrace("replicate sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", - gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock, + sTrace("replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", + gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); } while (1) { - sTrace("while 1 sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", - gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock, + sTrace("replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", + gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); taosMsleep(1000); } -- GitLab