diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 55f67c430e8a53c0087b204283c9daffb099a417..11e3cbd494731d7e05e5353c8b9bb29b0d2978bf 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -41,7 +41,7 @@ extern "C" { #define SNAPSHOT_WAIT_MS 1000 * 30 #define SYNC_MAX_RETRY_BACKOFF 5 -#define SYNC_LOG_REPL_RETRY_WAIT_MS 50 +#define SYNC_LOG_REPL_RETRY_WAIT_MS 100 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 #define SYNC_MAX_BATCH_SIZE 1 diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 3c485b48722a1e9a67e4eab7dc6f3c4046cc067f..1af31efd5dc22fdc94db178e7e7c811563ad7b87 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -566,33 +566,42 @@ int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr) { return 0; } -_Atomic int64_t tsRetryCnt = 0; - int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->endIndex <= pMgr->startIndex) { return 0; } + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; + if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) { + syncLogReplMgrReset(pMgr); + sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer: %" PRIx64, pNode->vgId, + pDestId->addr); + return -1; + } + int32_t ret = -1; bool retried = false; int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr); + int64_t nowMs = taosGetMonoTimestampMs(); + int count = 0; + int64_t firstIndex = -1; + SyncTerm term = -1; for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) { int64_t pos = index % pMgr->size; ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex)); - if (pMgr->states[pos].acked) { - continue; - } - int64_t nowMs = taosGetMonoTimestampMs(); + if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) { break; } - SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; - bool barrier = false; - SyncTerm term = -1; + if (pMgr->states[pos].acked) { + continue; + } + + bool barrier = false; if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { - sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); goto _out; } @@ -601,13 +610,19 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { pMgr->states[pos].term = term; pMgr->states[pos].acked = false; retried = true; - tsRetryCnt++; + if (firstIndex == -1) firstIndex = index; + count++; } ret = 0; _out: if (retried) { pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr); + sInfo("vgId:%d, resend %d raft log entries. dest: %" PRIx64 ", for indexes: %" PRId64 + " etc., maybe of term: %" PRId64 ", retryWaitMs: %" PRId64 ", repl mgr: [%" PRId64 " %" PRId64 ", %" PRId64 + ")", + pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex, + pMgr->endIndex); } return ret; } @@ -771,9 +786,10 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p int32_t batchSize = TMAX(1, pMgr->size / 20); int32_t count = 0; int64_t nowMs = taosGetMonoTimestampMs(); + int64_t limit = pMgr->size >> 1; for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) { - if (batchSize < count++ || pMgr->startIndex + pMgr->size <= index) { + if (batchSize < count++ || limit <= index - pMgr->startIndex) { break; } if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) { @@ -800,12 +816,13 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p } } + syncLogReplMgrRetryOnNeed(pMgr, pNode); + SSyncLogBuffer* pBuf = pNode->pLogBuf; sTrace("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - syncLogReplMgrRetryOnNeed(pMgr, pNode); return 0; }