diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index a1de2ee71ac12fba4a43676c589c9acb323492ac..68db811b1273128a679573a41c5e669df531c83a 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -59,36 +59,36 @@ typedef struct SSyncLogBuffer { } SSyncLogBuffer; // SSyncLogRepMgr -SSyncLogReplMgr* syncLogReplMgrCreate(); -void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr); -void syncLogReplMgrReset(SSyncLogReplMgr* pMgr); +SSyncLogReplMgr* syncLogReplCreate(); +void syncLogReplDestroy(SSyncLogReplMgr* pMgr); +void syncLogReplReset(SSyncLogReplMgr* pMgr); -int32_t syncNodeLogReplMgrInit(SSyncNode* pNode); -void syncNodeLogReplMgrDestroy(SSyncNode* pNode); +int32_t syncNodeLogReplInit(SSyncNode* pNode); +void syncNodeLogReplDestroy(SSyncNode* pNode); // access -static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) { +static FORCE_INLINE int64_t syncLogReplGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) { return ((int64_t)1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS; } -static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) { +static FORCE_INLINE int32_t syncLogReplGetNextRetryBackoff(SSyncLogReplMgr* pMgr) { return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF); } -SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); +SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); -int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier); -int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); +int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, + SRaftId* pDestId, bool* pBarrier); +int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); -int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); -int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); +int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); // SSyncLogBuffer SSyncLogBuffer* syncLogBufferCreate(); @@ -111,8 +111,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex); -int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, - int32_t applyCode); +int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, + int32_t applyCode); #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index a60f43cd5ed053fc00eba4b97c32e908698ea499..7c343c0e5d343e79d9f74e63b9548830e9b78a5f 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -85,7 +85,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr); return -1; } - (void)syncLogReplMgrProcessReply(pMgr, ths, pMsg); + (void)syncLogReplProcessReply(pMgr, ths, pMsg); } return 0; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a617a5d293fce6ac377a613e47fe0f0e58d08b23..c25ea24249bd43fd623828c20545aece3fdfc4af 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -949,7 +949,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { pSyncNode->changing = false; // replication mgr - if (syncNodeLogReplMgrInit(pSyncNode) < 0) { + if (syncNodeLogReplInit(pSyncNode) < 0) { sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr()); goto _error; } @@ -1122,7 +1122,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { syncNodeStopPingTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode); - syncNodeLogReplMgrDestroy(pSyncNode); + syncNodeLogReplDestroy(pSyncNode); syncRespMgrDestroy(pSyncNode->pSyncRespMgr); pSyncNode->pSyncRespMgr = NULL; @@ -2164,7 +2164,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); ASSERT(terrno != 0); - (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno); + (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno); syncEntryDestroy(pEntry); return -1; } @@ -2374,7 +2374,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs); - return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg); + return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg); } int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 6600b505c18405c40abe262655898d20c078fd60..04e52b3f493fc304dcde395afb879a586d152aa6 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -86,7 +86,7 @@ _err: return -1; } -SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { +SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncRaftEntry* pEntry = NULL; SyncIndex prevIndex = index - 1; @@ -316,7 +316,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - SyncTerm term = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index + 1); + SyncTerm term = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1); ASSERT(pEntry->term >= 0); if (term == pEntry->term) { ret = 0; @@ -351,7 +351,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - SyncTerm existPrevTerm = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index); + SyncTerm existPrevTerm = syncLogReplGetPrevLogTerm(NULL, pNode, index); ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm)); ret = 0; goto _out; @@ -482,8 +482,8 @@ _out: return matchIndex; } -int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, - int32_t applyCode) { +int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, + int32_t applyCode) { if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) { return 0; } @@ -564,7 +564,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pEntry->term, TMSG_INFO(pEntry->originalRpcType)); } - if (syncLogFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) { + if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) { sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64, vgId, pEntry->index, pEntry->term, role, currentTerm); @@ -611,7 +611,7 @@ _out: return ret; } -void syncLogReplMgrReset(SSyncLogReplMgr* pMgr) { +void syncLogReplReset(SSyncLogReplMgr* pMgr) { if (pMgr == NULL) return; ASSERT(pMgr->startIndex >= 0); @@ -625,14 +625,14 @@ void syncLogReplMgrReset(SSyncLogReplMgr* pMgr) { pMgr->retryBackoff = 0; } -int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { +int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->endIndex <= pMgr->startIndex) { return 0; } SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) { - syncLogReplMgrReset(pMgr); + syncLogReplReset(pMgr); sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId, pDestId->addr); return -1; @@ -640,7 +640,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { int32_t ret = -1; bool retried = false; - int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr); + int64_t retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr); int64_t nowMs = taosGetMonoTimestampMs(); int count = 0; int64_t firstIndex = -1; @@ -657,7 +657,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->states[pos].acked) { if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) { - syncLogReplMgrReset(pMgr); + syncLogReplReset(pMgr); sWarn("vgId:%d, reset sync log repl mgr since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, index, pDestId->addr); goto _out; @@ -666,7 +666,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } bool barrier = false; - if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); goto _out; @@ -687,7 +687,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ret = 0; _out: if (retried) { - pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr); + pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr); SSyncLogBuffer* pBuf = pNode->pLogBuf; sInfo("vgId:%d, resend %d sync log entries. dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: ... %" PRId64 ", retryWaitMs:%" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 @@ -698,7 +698,7 @@ _out: return ret; } -int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { +int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { SSyncLogBuffer* pBuf = pNode->pLogBuf; SRaftId destId = pMsg->srcId; ASSERT(pMgr->restored == false); @@ -716,7 +716,7 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p } } else { if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) { - syncLogReplMgrRetryOnNeed(pMgr, pNode); + syncLogReplRetryOnNeed(pMgr, pNode); return 0; } @@ -750,7 +750,7 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) { - term = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index + 1); + term = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1); if ((index + 1 < firstVer) || (term < 0) || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); @@ -773,53 +773,53 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p } // attempt to replicate the raft log at index - (void)syncLogReplMgrReset(pMgr); - return syncLogReplMgrReplicateProbe(pMgr, pNode, index); + (void)syncLogReplReset(pMgr); + return syncLogReplReplicateProbe(pMgr, pNode, index); } -int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { +int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "", pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); - syncLogReplMgrReset(pMgr); + syncLogReplReset(pMgr); pMgr->peerStartTime = pMsg->startTime; } taosThreadMutexUnlock(&pBuf->mutex); return 0; } -int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { +int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); if (pMsg->startTime != pMgr->peerStartTime) { sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64, pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); - syncLogReplMgrReset(pMgr); + syncLogReplReset(pMgr); pMgr->peerStartTime = pMsg->startTime; } if (pMgr->restored) { - (void)syncLogReplMgrProcessReplyAsNormal(pMgr, pNode, pMsg); + (void)syncLogReplProcessReplyAsNormal(pMgr, pNode, pMsg); } else { - (void)syncLogReplMgrProcessReplyAsRecovery(pMgr, pNode, pMsg); + (void)syncLogReplProcessReplyAsRecovery(pMgr, pNode, pMsg); } taosThreadMutexUnlock(&pBuf->mutex); return 0; } -int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { +int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->restored) { - (void)syncLogReplMgrReplicateAttempt(pMgr, pNode); + (void)syncLogReplReplicateAttempt(pMgr, pNode); } else { - (void)syncLogReplMgrReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex); + (void)syncLogReplReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex); } return 0; } -int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { +int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { ASSERT(!pMgr->restored); ASSERT(pMgr->startIndex >= 0); int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs(); @@ -829,12 +829,12 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) { return 0; } - (void)syncLogReplMgrReset(pMgr); + (void)syncLogReplReset(pMgr); SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; @@ -857,7 +857,7 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy return 0; } -int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { +int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ASSERT(pMgr->restored); SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; @@ -879,7 +879,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; @@ -902,7 +902,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) } } - syncLogReplMgrRetryOnNeed(pMgr, pNode); + syncLogReplRetryOnNeed(pMgr, pNode); SSyncLogBuffer* pBuf = pNode->pLogBuf; sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64 @@ -913,7 +913,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) return 0; } -int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { +int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { ASSERT(pMgr->restored == true); if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) { @@ -932,10 +932,10 @@ int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNo pMgr->startIndex = pMgr->matchIndex; } - return syncLogReplMgrReplicateAttempt(pMgr, pNode); + return syncLogReplReplicateAttempt(pMgr, pNode); } -SSyncLogReplMgr* syncLogReplMgrCreate() { +SSyncLogReplMgr* syncLogReplCreate() { SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr)); if (pMgr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -949,7 +949,7 @@ SSyncLogReplMgr* syncLogReplMgrCreate() { return pMgr; } -void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) { +void syncLogReplDestroy(SSyncLogReplMgr* pMgr) { if (pMgr == NULL) { return; } @@ -957,10 +957,10 @@ void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) { return; } -int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) { +int32_t syncNodeLogReplInit(SSyncNode* pNode) { for (int i = 0; i < TSDB_MAX_REPLICA; i++) { ASSERT(pNode->logReplMgrs[i] == NULL); - pNode->logReplMgrs[i] = syncLogReplMgrCreate(); + pNode->logReplMgrs[i] = syncLogReplCreate(); if (pNode->logReplMgrs[i] == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -970,9 +970,9 @@ int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) { return 0; } -void syncNodeLogReplMgrDestroy(SSyncNode* pNode) { +void syncNodeLogReplDestroy(SSyncNode* pNode) { for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - syncLogReplMgrDestroy(pNode->logReplMgrs[i]); + syncLogReplDestroy(pNode->logReplMgrs[i]); pNode->logReplMgrs[i] = NULL; } } @@ -1103,7 +1103,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { // reset repl mgr for (int i = 0; i < pNode->replicaNum; i++) { SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; - syncLogReplMgrReset(pMgr); + syncLogReplReset(pMgr); } syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); @@ -1127,8 +1127,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, return pEntry; } -int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier) { +int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, + SRaftId* pDestId, bool* pBarrier) { SSyncRaftEntry* pEntry = NULL; SRpcMsg msgOut = {0}; bool inBuf = false; @@ -1143,14 +1143,14 @@ int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy if (pMgr) { sInfo("vgId:%d, reset sync log repl mgr of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr, terrstr(), index); - (void)syncLogReplMgrReset(pMgr); + (void)syncLogReplReset(pMgr); } } goto _err; } *pBarrier = syncLogIsReplicationBarrier(pEntry); - prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index); + prevLogTerm = syncLogReplGetPrevLogTerm(pMgr, pNode, index); if (prevLogTerm < 0) { sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); goto _err; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 1d94b288d3a337d23efa5324a32df5f1ca564821..43d2bc839be87cc6343065b72e71402f20b445e7 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -52,7 +52,7 @@ int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) { SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); - syncLogReplMgrReset(pMgr); + syncLogReplReset(pMgr); taosThreadMutexUnlock(&pBuf->mutex); return 0; } @@ -74,7 +74,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) { continue; } SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; - (void)syncLogReplMgrReplicateOnce(pMgr, pNode); + (void)syncLogReplReplicateOnce(pMgr, pNode); } return 0; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index a519c76cda53a5f66faeaefa17f56053c5d083e6..056a5977770c92f91c7b55e3c9554b5ccb7bafd2 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -125,7 +125,7 @@ static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t buf pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); } -static void syncLogReplMgrStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { +static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { int len = 0; len += snprintf(buf + len, bufLen - len, "%s", "{"); for (int32_t i = 0; i < pSyncNode->replicaNum; i++) { @@ -178,7 +178,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); char replMgrStatesStr[1024] = ""; - syncLogReplMgrStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr)); + syncLogReplStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr)); char bufferStatesStr[256] = ""; syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));