未验证 提交 db1cb4a8 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20793 from taosdata/FIX/TD-23509-main

enh: refactor some func names of syncLogReplMgr
...@@ -59,36 +59,36 @@ typedef struct SSyncLogBuffer { ...@@ -59,36 +59,36 @@ typedef struct SSyncLogBuffer {
} SSyncLogBuffer; } SSyncLogBuffer;
// SSyncLogRepMgr // SSyncLogRepMgr
SSyncLogReplMgr* syncLogReplMgrCreate(); SSyncLogReplMgr* syncLogReplCreate();
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr); void syncLogReplDestroy(SSyncLogReplMgr* pMgr);
void syncLogReplMgrReset(SSyncLogReplMgr* pMgr); void syncLogReplReset(SSyncLogReplMgr* pMgr);
int32_t syncNodeLogReplMgrInit(SSyncNode* pNode); int32_t syncNodeLogReplInit(SSyncNode* pNode);
void syncNodeLogReplMgrDestroy(SSyncNode* pNode); void syncNodeLogReplDestroy(SSyncNode* pNode);
// access // access
static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) { static FORCE_INLINE int64_t syncLogReplGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) {
return ((int64_t)1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS; return ((int64_t)1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS;
} }
static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) { static FORCE_INLINE int32_t syncLogReplGetNextRetryBackoff(SSyncLogReplMgr* pMgr) {
return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF); return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF);
} }
SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
SRaftId* pDestId, bool* pBarrier); SRaftId* pDestId, bool* pBarrier);
int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
// SSyncLogBuffer // SSyncLogBuffer
SSyncLogBuffer* syncLogBufferCreate(); SSyncLogBuffer* syncLogBufferCreate();
...@@ -111,8 +111,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, ...@@ -111,8 +111,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex); int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode); int32_t applyCode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -85,7 +85,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -85,7 +85,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr); sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr);
return -1; return -1;
} }
(void)syncLogReplMgrProcessReply(pMgr, ths, pMsg); (void)syncLogReplProcessReply(pMgr, ths, pMsg);
} }
return 0; return 0;
} }
...@@ -949,7 +949,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -949,7 +949,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->changing = false; pSyncNode->changing = false;
// replication mgr // replication mgr
if (syncNodeLogReplMgrInit(pSyncNode) < 0) { if (syncNodeLogReplInit(pSyncNode) < 0) {
sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr()); sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr());
goto _error; goto _error;
} }
...@@ -1122,7 +1122,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1122,7 +1122,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeStopPingTimer(pSyncNode); syncNodeStopPingTimer(pSyncNode);
syncNodeStopElectTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode);
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeLogReplMgrDestroy(pSyncNode); syncNodeLogReplDestroy(pSyncNode);
syncRespMgrDestroy(pSyncNode->pSyncRespMgr); syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
pSyncNode->pSyncRespMgr = NULL; pSyncNode->pSyncRespMgr = NULL;
...@@ -2164,7 +2164,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { ...@@ -2164,7 +2164,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
ASSERT(terrno != 0); ASSERT(terrno != 0);
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno); (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
return -1; return -1;
} }
...@@ -2374,7 +2374,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2374,7 +2374,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs); syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg); return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
} }
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
......
...@@ -86,7 +86,7 @@ _err: ...@@ -86,7 +86,7 @@ _err:
return -1; return -1;
} }
SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
SSyncRaftEntry* pEntry = NULL; SSyncRaftEntry* pEntry = NULL;
SyncIndex prevIndex = index - 1; SyncIndex prevIndex = index - 1;
...@@ -316,7 +316,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -316,7 +316,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
" %" PRId64 ", %" PRId64 ")", " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex); pBuf->endIndex);
SyncTerm term = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index + 1); SyncTerm term = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1);
ASSERT(pEntry->term >= 0); ASSERT(pEntry->term >= 0);
if (term == pEntry->term) { if (term == pEntry->term) {
ret = 0; ret = 0;
...@@ -351,7 +351,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -351,7 +351,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
" %" PRId64 " %" PRId64 ", %" PRId64 ")", " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex); pBuf->endIndex);
SyncTerm existPrevTerm = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index); SyncTerm existPrevTerm = syncLogReplGetPrevLogTerm(NULL, pNode, index);
ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm)); ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm));
ret = 0; ret = 0;
goto _out; goto _out;
...@@ -482,8 +482,8 @@ _out: ...@@ -482,8 +482,8 @@ _out:
return matchIndex; return matchIndex;
} }
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode) { int32_t applyCode) {
if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) { if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) {
return 0; return 0;
} }
...@@ -564,7 +564,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ...@@ -564,7 +564,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pEntry->term, TMSG_INFO(pEntry->originalRpcType)); pEntry->term, TMSG_INFO(pEntry->originalRpcType));
} }
if (syncLogFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) { if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) {
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
", role:%d, current term:%" PRId64, ", role:%d, current term:%" PRId64,
vgId, pEntry->index, pEntry->term, role, currentTerm); vgId, pEntry->index, pEntry->term, role, currentTerm);
...@@ -611,7 +611,7 @@ _out: ...@@ -611,7 +611,7 @@ _out:
return ret; return ret;
} }
void syncLogReplMgrReset(SSyncLogReplMgr* pMgr) { void syncLogReplReset(SSyncLogReplMgr* pMgr) {
if (pMgr == NULL) return; if (pMgr == NULL) return;
ASSERT(pMgr->startIndex >= 0); ASSERT(pMgr->startIndex >= 0);
...@@ -625,14 +625,14 @@ void syncLogReplMgrReset(SSyncLogReplMgr* pMgr) { ...@@ -625,14 +625,14 @@ void syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
pMgr->retryBackoff = 0; pMgr->retryBackoff = 0;
} }
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->endIndex <= pMgr->startIndex) { if (pMgr->endIndex <= pMgr->startIndex) {
return 0; return 0;
} }
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) { if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
syncLogReplMgrReset(pMgr); syncLogReplReset(pMgr);
sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId, sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId,
pDestId->addr); pDestId->addr);
return -1; return -1;
...@@ -640,7 +640,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -640,7 +640,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
int32_t ret = -1; int32_t ret = -1;
bool retried = false; bool retried = false;
int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr); int64_t retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr);
int64_t nowMs = taosGetMonoTimestampMs(); int64_t nowMs = taosGetMonoTimestampMs();
int count = 0; int count = 0;
int64_t firstIndex = -1; int64_t firstIndex = -1;
...@@ -657,7 +657,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -657,7 +657,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->states[pos].acked) { if (pMgr->states[pos].acked) {
if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) { if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
syncLogReplMgrReset(pMgr); syncLogReplReset(pMgr);
sWarn("vgId:%d, reset sync log repl mgr since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, sWarn("vgId:%d, reset sync log repl mgr since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId,
index, pDestId->addr); index, pDestId->addr);
goto _out; goto _out;
...@@ -666,7 +666,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -666,7 +666,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
} }
bool barrier = false; bool barrier = false;
if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId, sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr); terrstr(), index, pDestId->addr);
goto _out; goto _out;
...@@ -687,7 +687,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -687,7 +687,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
ret = 0; ret = 0;
_out: _out:
if (retried) { if (retried) {
pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr); pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, resend %d sync log entries. dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: ... %" PRId64 sInfo("vgId:%d, resend %d sync log entries. dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: ... %" PRId64
", retryWaitMs:%" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 ", retryWaitMs:%" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
...@@ -698,7 +698,7 @@ _out: ...@@ -698,7 +698,7 @@ _out:
return ret; return ret;
} }
int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
SRaftId destId = pMsg->srcId; SRaftId destId = pMsg->srcId;
ASSERT(pMgr->restored == false); ASSERT(pMgr->restored == false);
...@@ -716,7 +716,7 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p ...@@ -716,7 +716,7 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p
} }
} else { } else {
if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) { if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
syncLogReplMgrRetryOnNeed(pMgr, pNode); syncLogReplRetryOnNeed(pMgr, pNode);
return 0; return 0;
} }
...@@ -750,7 +750,7 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p ...@@ -750,7 +750,7 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p
SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) { if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
term = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index + 1); term = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1);
if ((index + 1 < firstVer) || (term < 0) || if ((index + 1 < firstVer) || (term < 0) ||
(term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
...@@ -773,53 +773,53 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p ...@@ -773,53 +773,53 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p
} }
// attempt to replicate the raft log at index // attempt to replicate the raft log at index
(void)syncLogReplMgrReset(pMgr); (void)syncLogReplReset(pMgr);
return syncLogReplMgrReplicateProbe(pMgr, pNode, index); return syncLogReplReplicateProbe(pMgr, pNode, index);
} }
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "", sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
syncLogReplMgrReset(pMgr); syncLogReplReset(pMgr);
pMgr->peerStartTime = pMsg->startTime; pMgr->peerStartTime = pMsg->startTime;
} }
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
return 0; return 0;
} }
int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
if (pMsg->startTime != pMgr->peerStartTime) { if (pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64
", old:%" PRId64, ", old:%" PRId64,
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
syncLogReplMgrReset(pMgr); syncLogReplReset(pMgr);
pMgr->peerStartTime = pMsg->startTime; pMgr->peerStartTime = pMsg->startTime;
} }
if (pMgr->restored) { if (pMgr->restored) {
(void)syncLogReplMgrProcessReplyAsNormal(pMgr, pNode, pMsg); (void)syncLogReplProcessReplyAsNormal(pMgr, pNode, pMsg);
} else { } else {
(void)syncLogReplMgrProcessReplyAsRecovery(pMgr, pNode, pMsg); (void)syncLogReplProcessReplyAsRecovery(pMgr, pNode, pMsg);
} }
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
return 0; return 0;
} }
int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->restored) { if (pMgr->restored) {
(void)syncLogReplMgrReplicateAttempt(pMgr, pNode); (void)syncLogReplReplicateAttempt(pMgr, pNode);
} else { } else {
(void)syncLogReplMgrReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex); (void)syncLogReplReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex);
} }
return 0; return 0;
} }
int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
ASSERT(!pMgr->restored); ASSERT(!pMgr->restored);
ASSERT(pMgr->startIndex >= 0); ASSERT(pMgr->startIndex >= 0);
int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs(); int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
...@@ -829,12 +829,12 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy ...@@ -829,12 +829,12 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) { nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
return 0; return 0;
} }
(void)syncLogReplMgrReset(pMgr); (void)syncLogReplReset(pMgr);
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false; bool barrier = false;
SyncTerm term = -1; SyncTerm term = -1;
if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr); terrstr(), index, pDestId->addr);
return -1; return -1;
...@@ -857,7 +857,7 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy ...@@ -857,7 +857,7 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
return 0; return 0;
} }
int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
ASSERT(pMgr->restored); ASSERT(pMgr->restored);
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
...@@ -879,7 +879,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) ...@@ -879,7 +879,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode)
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false; bool barrier = false;
SyncTerm term = -1; SyncTerm term = -1;
if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr); terrstr(), index, pDestId->addr);
return -1; return -1;
...@@ -902,7 +902,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) ...@@ -902,7 +902,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode)
} }
} }
syncLogReplMgrRetryOnNeed(pMgr, pNode); syncLogReplRetryOnNeed(pMgr, pNode);
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64 sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64
...@@ -913,7 +913,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) ...@@ -913,7 +913,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode)
return 0; return 0;
} }
int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
ASSERT(pMgr->restored == true); ASSERT(pMgr->restored == true);
if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) { if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
...@@ -932,10 +932,10 @@ int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNo ...@@ -932,10 +932,10 @@ int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNo
pMgr->startIndex = pMgr->matchIndex; pMgr->startIndex = pMgr->matchIndex;
} }
return syncLogReplMgrReplicateAttempt(pMgr, pNode); return syncLogReplReplicateAttempt(pMgr, pNode);
} }
SSyncLogReplMgr* syncLogReplMgrCreate() { SSyncLogReplMgr* syncLogReplCreate() {
SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr)); SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
if (pMgr == NULL) { if (pMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -949,7 +949,7 @@ SSyncLogReplMgr* syncLogReplMgrCreate() { ...@@ -949,7 +949,7 @@ SSyncLogReplMgr* syncLogReplMgrCreate() {
return pMgr; return pMgr;
} }
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) { void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
if (pMgr == NULL) { if (pMgr == NULL) {
return; return;
} }
...@@ -957,10 +957,10 @@ void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) { ...@@ -957,10 +957,10 @@ void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) {
return; return;
} }
int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) { int32_t syncNodeLogReplInit(SSyncNode* pNode) {
for (int i = 0; i < TSDB_MAX_REPLICA; i++) { for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
ASSERT(pNode->logReplMgrs[i] == NULL); ASSERT(pNode->logReplMgrs[i] == NULL);
pNode->logReplMgrs[i] = syncLogReplMgrCreate(); pNode->logReplMgrs[i] = syncLogReplCreate();
if (pNode->logReplMgrs[i] == NULL) { if (pNode->logReplMgrs[i] == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -970,9 +970,9 @@ int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) { ...@@ -970,9 +970,9 @@ int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) {
return 0; return 0;
} }
void syncNodeLogReplMgrDestroy(SSyncNode* pNode) { void syncNodeLogReplDestroy(SSyncNode* pNode) {
for (int i = 0; i < TSDB_MAX_REPLICA; i++) { for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
syncLogReplMgrDestroy(pNode->logReplMgrs[i]); syncLogReplDestroy(pNode->logReplMgrs[i]);
pNode->logReplMgrs[i] = NULL; pNode->logReplMgrs[i] = NULL;
} }
} }
...@@ -1103,7 +1103,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ...@@ -1103,7 +1103,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
// reset repl mgr // reset repl mgr
for (int i = 0; i < pNode->replicaNum; i++) { for (int i = 0; i < pNode->replicaNum; i++) {
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
syncLogReplMgrReset(pMgr); syncLogReplReset(pMgr);
} }
syncLogBufferValidate(pBuf); syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
...@@ -1127,8 +1127,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, ...@@ -1127,8 +1127,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
return pEntry; return pEntry;
} }
int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
SRaftId* pDestId, bool* pBarrier) { SRaftId* pDestId, bool* pBarrier) {
SSyncRaftEntry* pEntry = NULL; SSyncRaftEntry* pEntry = NULL;
SRpcMsg msgOut = {0}; SRpcMsg msgOut = {0};
bool inBuf = false; bool inBuf = false;
...@@ -1143,14 +1143,14 @@ int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy ...@@ -1143,14 +1143,14 @@ int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
if (pMgr) { if (pMgr) {
sInfo("vgId:%d, reset sync log repl mgr of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, sInfo("vgId:%d, reset sync log repl mgr of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId,
pDestId->addr, terrstr(), index); pDestId->addr, terrstr(), index);
(void)syncLogReplMgrReset(pMgr); (void)syncLogReplReset(pMgr);
} }
} }
goto _err; goto _err;
} }
*pBarrier = syncLogIsReplicationBarrier(pEntry); *pBarrier = syncLogIsReplicationBarrier(pEntry);
prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index); prevLogTerm = syncLogReplGetPrevLogTerm(pMgr, pNode, index);
if (prevLogTerm < 0) { if (prevLogTerm < 0) {
sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
goto _err; goto _err;
......
...@@ -52,7 +52,7 @@ int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) { ...@@ -52,7 +52,7 @@ int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
syncLogReplMgrReset(pMgr); syncLogReplReset(pMgr);
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
return 0; return 0;
} }
...@@ -74,7 +74,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) { ...@@ -74,7 +74,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
continue; continue;
} }
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
(void)syncLogReplMgrReplicateOnce(pMgr, pNode); (void)syncLogReplReplicateOnce(pMgr, pNode);
} }
return 0; return 0;
} }
......
...@@ -125,7 +125,7 @@ static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t buf ...@@ -125,7 +125,7 @@ static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t buf
pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
} }
static void syncLogReplMgrStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
int len = 0; int len = 0;
len += snprintf(buf + len, bufLen - len, "%s", "{"); len += snprintf(buf + len, bufLen - len, "%s", "{");
for (int32_t i = 0; i < pSyncNode->replicaNum; i++) { for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
...@@ -178,7 +178,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -178,7 +178,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
char replMgrStatesStr[1024] = ""; char replMgrStatesStr[1024] = "";
syncLogReplMgrStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr)); syncLogReplStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr));
char bufferStatesStr[256] = ""; char bufferStatesStr[256] = "";
syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr)); syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册