From 327b938cbb7ca7f8a4465f1b6a912d2670a0fd66 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 5 Aug 2022 15:33:44 +0800 Subject: [PATCH] refactor(sync): make leader life longer --- source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncMain.c | 10 ++- source/libs/sync/src/syncRaftLog.c | 84 -------------------------- source/libs/sync/src/syncRequestVote.c | 20 +++--- source/libs/sync/test/sh/a.sh | 12 ++++ 5 files changed, 33 insertions(+), 94 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index bc3275a971..82399f52b9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -212,6 +212,7 @@ void syncNodeRelease(SSyncNode* pNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); +void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr); void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5af5b5f988..15617523fa 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1987,6 +1987,12 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { } } +void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { + if (term > pSyncNode->pRaftStore->currentTerm) { + raftStoreSetTerm(pSyncNode->pRaftStore, term); + } +} + void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -2614,7 +2620,7 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p // fsync once SSyncLogStoreData* pData = ths->pLogStore->data; SWal* pWal = pData->pWal; - walFsync(pWal, true); + walFsync(pWal, false); if (ths->replicaNum > 1) { // if multi replica, start replicate right now @@ -2801,7 +2807,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ESyncState state = flag; char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex); + snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex); syncNodeEventLog(ths, eventLog); // execute fsm diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index bf440f04a0..b575e40d86 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -237,51 +237,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr return 0; } -#if 0 -static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - - SyncIndex writeIndex = raftLogWriteIndex(pLogStore); - if (pEntry->index != writeIndex) { - sError("vgId:%d, wal write index error, entry-index:%" PRId64 " update to %" PRId64, pData->pSyncNode->vgId, - pEntry->index, writeIndex); - pEntry->index = writeIndex; - } - - int code = 0; - SWalSyncInfo syncMeta; - syncMeta.isWeek = pEntry->isWeak; - syncMeta.seqNum = pEntry->seqNum; - syncMeta.term = pEntry->term; - code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); - if (code != 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t sysErr = errno; - const char* sysErrStr = strerror(errno); - - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - pEntry->index, err, err, errStr, sysErr, sysErrStr); - syncNodeErrorLog(pData->pSyncNode, logBuf); - - ASSERT(0); - } - - // walFsync(pWal, true); - - do { - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index, - TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); - syncNodeEventLog(pData->pSyncNode, eventLog); - } while (0); - - return code; -} -#endif - // entry found, return 0 // entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST // other error, return -1 @@ -400,45 +355,6 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp //------------------------------- // log[0 .. n] -#if 0 -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - - SyncIndex lastIndex = logStoreLastIndex(pLogStore); - ASSERT(pEntry->index == lastIndex + 1); - - int code = 0; - SWalSyncInfo syncMeta; - syncMeta.isWeek = pEntry->isWeak; - syncMeta.seqNum = pEntry->seqNum; - syncMeta.term = pEntry->term; - code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); - if (code != 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t sysErr = errno; - const char* sysErrStr = strerror(errno); - - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - pEntry->index, err, err, errStr, sysErr, sysErrStr); - syncNodeErrorLog(pData->pSyncNode, logBuf); - - ASSERT(0); - } - - // walFsync(pWal, true); - - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "old write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index, - TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); - syncNodeEventLog(pData->pSyncNode, eventLog); - - return code; -} -#endif - int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 806c138b72..b47294a8e1 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -55,14 +55,13 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore))); - // log not ok, do not update term, ignore it - if (pMsg->term > ths->pRaftStore->currentTerm && !logOK) { - return -1; - } - // maybe update term if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); + if (logOK) { + syncNodeUpdateTerm(ths, pMsg->term); + } else { + syncNodeUpdateTermWithoutStepDown(ths, pMsg->term); + } } ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); @@ -164,13 +163,18 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { return -1; } + bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); + // maybe update term if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); + if (logOK) { + syncNodeUpdateTerm(ths, pMsg->term); + } else { + syncNodeUpdateTermWithoutStepDown(ths, pMsg->term); + } } ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); if (grant) { diff --git a/source/libs/sync/test/sh/a.sh b/source/libs/sync/test/sh/a.sh index 751b42b9c2..3983d30b7c 100644 --- a/source/libs/sync/test/sh/a.sh +++ b/source/libs/sync/test/sh/a.sh @@ -81,4 +81,16 @@ for file in `ls ${logpath}/log.dnode*vgId*`;do done +echo "" +echo "generate log.commit ..." +tmpfile=${logpath}/log.commits.tmp +touch ${tmpfile} +for file in `ls ${logpath}/log.dnode*.vgId*.commit`;do + line=`cat ${file} | tail -n1` + echo $line | awk '{print $5, $0}' >> ${tmpfile} +done +cat ${tmpfile} | sort -k1 > ${logpath}/log.commits + exit 0 + + -- GitLab