From 3e6d6fe89b5e05018e0c97a2ee55cb5bc027788b Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 19 Oct 2022 19:12:04 +0800 Subject: [PATCH] refactor(sync): do replicate in timer routine --- source/libs/sync/src/syncMain.c | 16 ++++++---------- source/libs/sync/src/syncReplication.c | 21 ++++++++++++++++++--- source/libs/sync/src/syncTimeout.c | 3 +++ 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d65d791147..32b6424ac2 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1259,11 +1259,9 @@ void syncNodeStart(SSyncNode* pSyncNode) { syncNodeBecomeFollower(pSyncNode, "first start"); } - if (syncNodeIsMnode(pSyncNode)) { - int32_t ret = 0; - ret = syncNodeStartPingTimer(pSyncNode); - ASSERT(ret == 0); - } + int32_t ret = 0; + ret = syncNodeStartPingTimer(pSyncNode); + ASSERT(ret == 0); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -1276,11 +1274,9 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); ASSERT(ret == 0); - if (pSyncNode->vgId == 1) { - int32_t ret = 0; - ret = syncNodeStartPingTimer(pSyncNode); - ASSERT(ret == 0); - } + ret = 0; + ret = syncNodeStartPingTimer(pSyncNode); + ASSERT(ret == 0); } void syncNodeClose(SSyncNode* pSyncNode) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 0ce1f5989a..e7b712acc9 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -55,7 +55,12 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { // maybe start snapshot SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); - if (nextIndex < logStartIndex || nextIndex > logEndIndex) { + if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "start snapshot for next-index:%ld, start:%ld, end:%ld", nextIndex, logStartIndex, + logEndIndex); + syncNodeEventLog(pSyncNode, logBuf); + // start snapshot int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId); ASSERT(code == 0); @@ -128,12 +133,12 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { } int32_t syncNodeReplicate(SSyncNode* pSyncNode) { - syncNodeEventLog(pSyncNode, "do replicate"); - if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { return -1; } + syncNodeEventLog(pSyncNode, "do replicate"); + int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* pDestId = &(pSyncNode->peersId[i]); @@ -170,7 +175,17 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest int32_t ret = 0; if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) { ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pMsg); + + } else { + char logBuf[128]; + char host[64]; + int16_t port; + syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); + + snprintf(logBuf, sizeof(logBuf), "do not repcate to %s:%d for index:%ld", host, port, pMsg->prevLogIndex + 1); + syncNodeEventLog(pSyncNode, logBuf); } + return ret; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 62a81133f3..87368cb45c 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -60,6 +60,9 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) { int32_t syncNodeTimerRoutine(SSyncNode* ths) { syncNodeEventLog(ths, "timer routines"); + // timer replicate + syncNodeReplicate(ths); + if (syncNodeIsMnode(ths)) { syncNodeCleanConfigIndex(ths); } -- GitLab