From 39ddf9faf875f543b3a1d6cb39d2866412cae42c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 25 Nov 2022 21:51:57 +0800 Subject: [PATCH] refactor: adjust syncLogHeartbeat --- source/libs/sync/inc/syncReplication.h | 2 +- source/libs/sync/inc/syncUtil.h | 4 ++-- source/libs/sync/src/syncMain.c | 23 ++++++++++------------- source/libs/sync/src/syncReplication.c | 6 +++--- source/libs/sync/src/syncUtil.c | 26 +++++++++++++++++++------- 5 files changed, 35 insertions(+), 26 deletions(-) diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index eae931d989..7da610a9ed 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -48,7 +48,7 @@ extern "C" { // /\ UNCHANGED <> int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode); -int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg, const char* debugStr); +int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg); int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index b7ee320aa5..8b63b675cf 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -94,8 +94,8 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); -void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); -void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed); +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff); void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b926cf2cf5..40a6275f81 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2034,6 +2034,11 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { SRpcMsg rpcMsg = {0}; (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId); + // update reset time + int64_t tsNow = taosGetTimestampMs(); + int64_t timerElapsed = tsNow - pSyncTimer->timeStamp; + pSyncTimer->timeStamp = tsNow; + SyncHeartbeat* pSyncMsg = rpcMsg.pCont; pSyncMsg->srcId = pSyncNode->myRaftId; pSyncMsg->destId = pData->destId; @@ -2041,17 +2046,11 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; - pSyncMsg->timeStamp = taosGetTimestampMs(); - - // update reset time - int64_t tsNow = taosGetTimestampMs(); - int64_t timerElapsed = tsNow - pSyncTimer->timeStamp; - pSyncTimer->timeStamp = tsNow; - char logBuf[64]; - snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64, timerElapsed); + pSyncMsg->timeStamp = tsNow; // send msg - syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf); + syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed); + syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); } else { sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId, @@ -2161,9 +2160,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int64_t tsMs = taosGetTimestampMs(); int64_t timeDiff = tsMs - pMsg->timeStamp; - char buf[128]; - snprintf(buf, sizeof(buf), "net elapsed:%" PRId64, timeDiff); - syncLogRecvHeartbeat(ths, pMsg, buf); + syncLogRecvHeartbeat(ths, pMsg, timeDiff); SRpcMsg rpcMsg = {0}; (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); @@ -2173,7 +2170,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->srcId = ths->myRaftId; pMsgReply->term = ths->pRaftStore->currentTerm; pMsgReply->privateTerm = 8864; // magic number - pMsgReply->timeStamp = taosGetTimestampMs(); + pMsgReply->timeStamp = tsMs; if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 27f6e855d6..9dbd9fb370 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -207,8 +207,7 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest return ret; } -int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg, const char* debugStr) { - syncLogSendHeartbeat(pSyncNode, pMsg->pCont, debugStr); +int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) { return syncNodeSendMsgById(destId, pSyncNode, pMsg); } @@ -231,7 +230,8 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { pSyncMsg->timeStamp = ts; // send msg - syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, "x"); + syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0); + syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); } return 0; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 2908e7b945..6c7f55bf9e 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -430,25 +430,37 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); } -void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, - "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s); + if (printX) { + sNTrace(pSyncNode, + "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 + "}, x", + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp); + } else { + sNTrace(pSyncNode, + "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 + "}, timer-elapsed:%" PRId64, + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed); + } } -void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff) { + if (!(sDebugFlag & DEBUG_TRACE)) return; + char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s); + "}, net elapsed:%" PRId64, + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timeDiff); } void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { -- GitLab