提交 f6be5f2c 编写于 作者: B Benguang Zhao

enh: change sync log repl mgr to sync log repl in logging msg

上级 edc9fe97
...@@ -633,7 +633,7 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -633,7 +633,7 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) { if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
syncLogReplReset(pMgr); syncLogReplReset(pMgr);
sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId, sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId,
pDestId->addr); pDestId->addr);
return -1; return -1;
} }
...@@ -658,8 +658,8 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -658,8 +658,8 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->states[pos].acked) { if (pMgr->states[pos].acked) {
if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) { if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
syncLogReplReset(pMgr); syncLogReplReset(pMgr);
sWarn("vgId:%d, reset sync log repl mgr since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, sWarn("vgId:%d, reset sync log repl since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, index,
index, pDestId->addr); pDestId->addr);
goto _out; goto _out;
} }
continue; continue;
...@@ -708,7 +708,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod ...@@ -708,7 +708,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod
ASSERT(pMgr->matchIndex == 0); ASSERT(pMgr->matchIndex == 0);
if (pMsg->matchIndex < 0) { if (pMsg->matchIndex < 0) {
pMgr->restored = true; pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
...@@ -725,7 +725,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod ...@@ -725,7 +725,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod
if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) { if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
pMgr->matchIndex = pMsg->matchIndex; pMgr->matchIndex = pMsg->matchIndex;
pMgr->restored = true; pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
...@@ -781,7 +781,7 @@ int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode ...@@ -781,7 +781,7 @@ int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "", sInfo("vgId:%d, reset sync log repl in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
syncLogReplReset(pMgr); syncLogReplReset(pMgr);
pMgr->peerStartTime = pMsg->startTime; pMgr->peerStartTime = pMsg->startTime;
...@@ -794,8 +794,7 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp ...@@ -794,8 +794,7 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
if (pMsg->startTime != pMgr->peerStartTime) { if (pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 sInfo("vgId:%d, reset sync log repl in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
", old:%" PRId64,
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
syncLogReplReset(pMgr); syncLogReplReset(pMgr);
pMgr->peerStartTime = pMsg->startTime; pMgr->peerStartTime = pMsg->startTime;
...@@ -1141,8 +1140,8 @@ int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI ...@@ -1141,8 +1140,8 @@ int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
if (pMgr) { if (pMgr) {
sInfo("vgId:%d, reset sync log repl mgr of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, sInfo("vgId:%d, reset sync log repl of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr,
pDestId->addr, terrstr(), index); terrstr(), index);
(void)syncLogReplReset(pMgr); (void)syncLogReplReset(pMgr);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册