diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index b6d0aecfd1d6f07eb4769a4f30a371c825c7c69b..1b65d95cb188a09e17ea729e3cb6bd5d41ab6409 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -43,6 +43,9 @@ extern "C" { #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 #define SYNC_HEART_TIMEOUT_MS 1000 * 8 +#define SYNC_HEARTBEAT_SLOW_MS 1500 +#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 + #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 @@ -226,6 +229,8 @@ int32_t syncEndSnapshot(int64_t rid); int32_t syncLeaderTransfer(int64_t rid); int32_t syncStepDown(int64_t rid, SyncTerm newTerm); bool syncIsReadyForRead(int64_t rid); +bool syncSnapshotSending(int64_t rid); +bool syncSnapshotRecving(int64_t rid); SSyncState syncGetState(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 93ea138c841ec4b2d098ef534c5f74021f97e9b8..d764901ab38f7762a18380a1db903e3bbd099e13 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -195,6 +195,8 @@ typedef struct SSyncNode { int32_t electNum; int32_t becomeLeaderNum; int32_t configChangeNum; + int32_t hbSlowNum; + int32_t hbrSlowNum; bool isStart; @@ -239,6 +241,8 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h); bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode); +bool syncNodeSnapshotSending(SSyncNode* pSyncNode); +bool syncNodeSnapshotRecving(SSyncNode* pSyncNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 62eba3208e44582f7088de612b754e409c8babc1..19a3bd06cfe665e98599565b81ac426487c32146 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -447,6 +447,28 @@ bool syncIsReadyForRead(int64_t rid) { return ready; } +bool syncSnapshotSending(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) { + return false; + } + + bool b = syncNodeSnapshotSending(pSyncNode); + syncNodeRelease(pSyncNode); + return b; +} + +bool syncSnapshotRecving(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) { + return false; + } + + bool b = syncNodeSnapshotRecving(pSyncNode); + syncNodeRelease(pSyncNode); + return b; +} + int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { if (pSyncNode->peersNum == 0) { sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId); @@ -1013,6 +1035,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { pSyncNode->electNum = 0; pSyncNode->becomeLeaderNum = 0; pSyncNode->configChangeNum = 0; + pSyncNode->hbSlowNum = 0; + pSyncNode->hbrSlowNum = 0; sNTrace(pSyncNode, "sync open, node:%p", pSyncNode); @@ -1563,6 +1587,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->leaderCache = EMPTY_RAFT_ID; } + pSyncNode->hbSlowNum = 0; + // state change pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; syncNodeStopHeartbeatTimer(pSyncNode); @@ -1607,6 +1633,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->leaderTime = taosGetTimestampMs(); pSyncNode->becomeLeaderNum++; + pSyncNode->hbrSlowNum = 0; // reset restoreFinish pSyncNode->restoreFinish = false; @@ -2167,6 +2194,25 @@ bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) { return b; } +bool syncNodeSnapshotSending(SSyncNode* pSyncNode) { + if (pSyncNode == NULL) return false; + bool b = false; + for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { + if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) { + b = true; + break; + } + } + return b; +} + +bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) { + if (pSyncNode == NULL) return false; + if (pSyncNode->pNewNodeReceiver == NULL) return false; + if (pSyncNode->pNewNodeReceiver->start) return true; + return false; +} + static int32_t syncNodeAppendNoop(SSyncNode* ths) { int32_t ret = 0; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index c022a3e75bc0ca8b1a51a979768b13461d8c5c6c..552cf9e7eb1d640869b4c72b411ae7c59f4eda75 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -283,13 +283,15 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo "vgId:%d, sync %s " "%s" ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 - ", snap-tm:%" PRIu64 ", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, aq:%d, snaping:%" PRId64 + ", snap-tm:%" PRIu64 + ", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, aq:%d, snaping:%" PRId64 ", r-num:%d, lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s", pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, - pNode->configChangeNum, cacheHit, cacheMiss, aqItems, pNode->snapshottingIndex, pNode->replicaNum, - pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->electTimerLogicClock, - pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr, hbrTimeStr); + pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, + pNode->snapshottingIndex, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, + pNode->restoreFinish, quorum, pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, + hbTimeStr, hbrTimeStr); } } @@ -463,12 +465,23 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool } void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff) { + if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) { + pSyncNode->hbSlowNum++; + + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNInfo(pSyncNode, + "recv sync-heartbeat from %s:%d slow {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 + "}, net elapsed:%" PRId64, + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timeDiff); + } + if (!(sDebugFlag & DEBUG_TRACE)) return; char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, net elapsed:%" PRId64, @@ -487,6 +500,17 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p } void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff) { + if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) { + pSyncNode->hbrSlowNum++; + + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, + "recv sync-heartbeat-reply from %s:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, net elapsed:%" PRId64, host, + port, pMsg->term, pMsg->timeStamp, timeDiff); + } + if (!(sDebugFlag & DEBUG_TRACE)) return; char host[64];