diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d65d791147ca20bb586ce2553195b2941ed1f607..32b6424ac2ee78fd50c5e8541b30e158150ca613 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1259,11 +1259,9 @@ void syncNodeStart(SSyncNode* pSyncNode) { syncNodeBecomeFollower(pSyncNode, "first start"); } - if (syncNodeIsMnode(pSyncNode)) { - int32_t ret = 0; - ret = syncNodeStartPingTimer(pSyncNode); - ASSERT(ret == 0); - } + int32_t ret = 0; + ret = syncNodeStartPingTimer(pSyncNode); + ASSERT(ret == 0); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -1276,11 +1274,9 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); ASSERT(ret == 0); - if (pSyncNode->vgId == 1) { - int32_t ret = 0; - ret = syncNodeStartPingTimer(pSyncNode); - ASSERT(ret == 0); - } + ret = 0; + ret = syncNodeStartPingTimer(pSyncNode); + ASSERT(ret == 0); } void syncNodeClose(SSyncNode* pSyncNode) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 0ce1f5989a3945bcdab37407ad0ca31350281542..e7b712acc98e985edd67250534541d690d6d568a 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -55,7 +55,12 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { // maybe start snapshot SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); - if (nextIndex < logStartIndex || nextIndex > logEndIndex) { + if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "start snapshot for next-index:%ld, start:%ld, end:%ld", nextIndex, logStartIndex, + logEndIndex); + syncNodeEventLog(pSyncNode, logBuf); + // start snapshot int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId); ASSERT(code == 0); @@ -128,12 +133,12 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { } int32_t syncNodeReplicate(SSyncNode* pSyncNode) { - syncNodeEventLog(pSyncNode, "do replicate"); - if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { return -1; } + syncNodeEventLog(pSyncNode, "do replicate"); + int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* pDestId = &(pSyncNode->peersId[i]); @@ -170,7 +175,17 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest int32_t ret = 0; if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) { ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pMsg); + + } else { + char logBuf[128]; + char host[64]; + int16_t port; + syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); + + snprintf(logBuf, sizeof(logBuf), "do not repcate to %s:%d for index:%ld", host, port, pMsg->prevLogIndex + 1); + syncNodeEventLog(pSyncNode, logBuf); } + return ret; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 62a81133f30bf1e3e23839be15b53392c780c412..87368cb45c2a411c7f24c7b5e582b5352b1f31dc 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -60,6 +60,9 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) { int32_t syncNodeTimerRoutine(SSyncNode* ths) { syncNodeEventLog(ths, "timer routines"); + // timer replicate + syncNodeReplicate(ths); + if (syncNodeIsMnode(ths)) { syncNodeCleanConfigIndex(ths); }