diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 1e72b4eb261ff1f7137df85ccddb0af8fe17e030..aa8d3bef517908f402667629bfa023c2045e18d1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -232,7 +232,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, S int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h); -bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode); +bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index acaf499cbbfa90a81814a4ceb8e0e3d8d546b784..7ceec29be4d0c74b6b86b95e5f5a64ee9cc83fc2 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -124,6 +124,7 @@ typedef struct SyncHeartbeat { SyncIndex commitIndex; SyncTerm privateTerm; SyncTerm minMatchIndex; + int64_t timeStamp; } SyncHeartbeat; typedef struct SyncHeartbeatReply { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 008e49c20c039eacc07d059fadbffae7b25c5760..7dab496a5b24a87395404368594a64f5362eff2e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -640,7 +640,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { } // heartbeat timeout - if (syncNodeHeartbeatTimeout(pSyncNode)) { + if (syncNodeHeartbeatReplyTimeout(pSyncNode)) { terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); @@ -2039,6 +2039,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; + pSyncMsg->timeStamp = taosGetTimestampMs(); // send msg syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); @@ -2094,7 +2095,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand return code; } -bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode) { +bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) { if (pSyncNode->replicaNum == 1) { return false; } @@ -2148,7 +2149,11 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeat* pMsg = pRpcMsg->pCont; - syncLogRecvHeartbeat(ths, pMsg, ""); + + int64_t tsMs = taosGetTimestampMs(); + char buf[128]; + snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs); + syncLogRecvHeartbeat(ths, pMsg, buf); SRpcMsg rpcMsg = {0}; (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); @@ -2161,6 +2166,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->timeStamp = taosGetTimestampMs(); if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { + syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs); + syncNodeResetElectTimer(ths); ths->minMatchIndex = pMsg->minMatchIndex; @@ -2220,9 +2227,11 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeatReply* pMsg = pRpcMsg->pCont; - syncLogRecvHeartbeatReply(ths, pMsg, ""); int64_t tsMs = taosGetTimestampMs(); + char buf[128]; + snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs); + syncLogRecvHeartbeatReply(ths, pMsg, buf); // update last reply time, make decision whether the other node is alive or not syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index de5f71e5a99222ed433ffaa192766c5e8b4fdac4..54c29febe5624a8be0068ef1df635a0bee01ed73 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -213,9 +213,11 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcM } int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { + int64_t ts = taosGetTimestampMs(); for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { SRpcMsg rpcMsg = {0}; if (syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId) != 0) { + sError("vgId:%d, build sync-heartbeat error", pSyncNode->vgId); continue; } @@ -226,6 +228,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->privateTerm = 0; + pSyncMsg->timeStamp = ts; // send msg syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 4c847862865fbd4be754a9a108924a7967fe2f17..1e5a268e9768060bc0fcddfdcae6f887f1610924 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncUtil.h" +#include "syncIndexMgr.h" #include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" @@ -175,6 +176,36 @@ void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { } } +// for leader +static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { + int32_t len = 5; + + for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { + int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i])); + + if (i < pSyncNode->replicaNum - 1) { + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs); + } else { + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs); + } + } +} + +// for follower +static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { + int32_t len = 4; + + for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { + int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i])); + + if (i < pSyncNode->replicaNum - 1) { + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs); + } else { + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs); + } + } +} + static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { int32_t len = 1; @@ -221,6 +252,12 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo char peerStr[1024] = "{"; syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); + char hbrTimeStr[256] = "hbr:{"; + syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr)); + + char hbTimeStr[256] = "hb:{"; + syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr)); + int32_t quorum = syncNodeDynamicQuorum(pNode); char eventLog[512]; // {0}; @@ -243,12 +280,13 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo "%s" ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, snaping:%" PRId64 ", r-num:%d, lcfg:%" PRId64 - ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", + ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s", pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy, aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, - pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); + pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr, + hbrTimeStr); } } @@ -395,9 +433,8 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); + "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s); } void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { @@ -406,9 +443,9 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 + "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s); } void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { @@ -416,8 +453,8 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->privateTerm, s); + sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->timeStamp, s); } void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {