提交 082428ac 编写于 作者: B Benguang Zhao

enh: reduce the number of probing msgs under stress in recovery mode

上级 736a1cc0
...@@ -87,7 +87,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy ...@@ -87,7 +87,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
SRaftId* pDestId, bool* pBarrier); SRaftId* pDestId, bool* pBarrier);
int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
......
...@@ -612,6 +612,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -612,6 +612,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
pMgr->states[pos].timeMs = nowMs; pMgr->states[pos].timeMs = nowMs;
pMgr->states[pos].term = term; pMgr->states[pos].term = term;
pMgr->states[pos].acked = false; pMgr->states[pos].acked = false;
retried = true; retried = true;
if (firstIndex == -1) firstIndex = index; if (firstIndex == -1) firstIndex = index;
count++; count++;
...@@ -658,6 +659,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -658,6 +659,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) { if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
pMgr->matchIndex = pMsg->matchIndex;
pMgr->restored = true; pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
...@@ -667,8 +669,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -667,8 +669,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
} }
if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) { if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) {
sError("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64, sWarn("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64,
pNode->vgId, host, port, pMsg->matchIndex, pMsg->lastSendIndex); pNode->vgId, host, port, pMsg->matchIndex, pMsg->lastSendIndex);
if (syncNodeStartSnapshot(pNode, &destId) < 0) { if (syncNodeStartSnapshot(pNode, &destId) < 0) {
sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port); sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port);
return -1; return -1;
...@@ -676,8 +678,6 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -676,8 +678,6 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port); sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port);
return 0; return 0;
} }
(void)syncLogReplMgrReset(pMgr);
} }
// check last match term // check last match term
...@@ -709,24 +709,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -709,24 +709,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
} }
// attempt to replicate the raft log at index // attempt to replicate the raft log at index
bool barrier = false; (void)syncLogReplMgrReset(pMgr);
ASSERT(index >= 0); return syncLogReplMgrReplicateProbeOnce(pMgr, pNode, index);
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, &destId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", peer %s:%d", pNode->vgId,
terrstr(), index, host, port);
return -1;
}
int64_t nowMs = taosGetMonoTimestampMs();
pMgr->states[index % pMgr->size].barrier = barrier;
pMgr->states[index % pMgr->size].timeMs = nowMs;
pMgr->states[index % pMgr->size].term = term;
pMgr->states[index % pMgr->size].acked = false;
pMgr->matchIndex = index;
pMgr->startIndex = index;
pMgr->endIndex = index + 1;
return 0;
} }
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
...@@ -766,14 +750,23 @@ int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -766,14 +750,23 @@ int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->restored) { if (pMgr->restored) {
(void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); (void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
} else { } else {
(void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode); (void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode, pNode->pLogBuf->matchIndex);
} }
return 0; return 0;
} }
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
ASSERT(!pMgr->restored); ASSERT(!pMgr->restored);
SyncIndex index = pNode->pLogBuf->matchIndex; ASSERT(pMgr->startIndex >= 0);
int64_t retryMaxWaitMs = SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
int64_t nowMs = taosGetMonoTimestampMs();
if (pMgr->endIndex > pMgr->startIndex &&
nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
return 0;
}
(void)syncLogReplMgrReset(pMgr);
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false; bool barrier = false;
SyncTerm term = -1; SyncTerm term = -1;
...@@ -783,6 +776,15 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode ...@@ -783,6 +776,15 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
return -1; return -1;
} }
ASSERT(index >= 0);
pMgr->states[index % pMgr->size].barrier = barrier;
pMgr->states[index % pMgr->size].timeMs = nowMs;
pMgr->states[index % pMgr->size].term = term;
pMgr->states[index % pMgr->size].acked = false;
pMgr->startIndex = index;
pMgr->endIndex = index + 1;
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sTrace("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 sTrace("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64
". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册