diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 20003791600e1b326cac33f72d2ae223fecedd9d..00f3791ea255644b8afe3cdbf85cb320de10d721 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -843,6 +843,14 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { ASSERT(pMgr->restored == true); if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { + if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) { + int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs; + int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs; + int64_t timeDiffMs = lastSentMs - firstSentMs; + if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) { + pMgr->retryBackoff -= 1; + } + } pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex); for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {