From 4a25963732d3c26ff25b13b8b3af7fc95bf906d4 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 26 Nov 2022 10:46:57 +0800 Subject: [PATCH] refactor(sync): optimized heartbeat timer --- source/libs/sync/inc/syncEnv.h | 1 + source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncMain.c | 71 +++++++++++++++++++++------------ 3 files changed, 47 insertions(+), 26 deletions(-) diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 04e8e5edd4..265da9703d 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -29,6 +29,7 @@ extern "C" { #define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) #define HEARTBEAT_TIMER_MS 1000 +#define HEARTBEAT_TICK_NUM 20 typedef struct SSyncEnv { uint8_t isStart; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e882f7461d..22ae922f62 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -61,6 +61,7 @@ typedef struct SSyncHbTimerData { SSyncTimer* pTimer; SRaftId destId; uint64_t logicClock; + int64_t execTime; int64_t rid; } SSyncHbTimerData; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3de14b917f..7881ee0f3a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -711,9 +711,10 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { pData->pTimer = pSyncTimer; pData->destId = pSyncTimer->destId; pData->logicClock = pSyncTimer->logicClock; + pData->execTime = taosGetTimestampMs() + pSyncTimer->timerMS; - taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager, - &pSyncTimer->pTimer); + taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid), + syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); } @@ -1979,6 +1980,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { int64_t hbDataRid = (int64_t)param; + int64_t tsNow = taosGetTimestampMs(); SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid); if (pData == NULL) { @@ -2023,36 +2025,53 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { int64_t msgLogicClock = atomic_load_64(&pData->logicClock); if (timerLogicClock == msgLogicClock) { + if (tsNow > pData->execTime) { +#if 0 + sTrace( + "vgId:%d, hbDataRid:%ld, EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, " + "---------", + pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime); +#endif + + pData->execTime += pSyncTimer->timerMS; + + SRpcMsg rpcMsg = {0}; + (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId); + + SyncHeartbeat* pSyncMsg = rpcMsg.pCont; + pSyncMsg->srcId = pSyncNode->myRaftId; + pSyncMsg->destId = pData->destId; + pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; + pSyncMsg->commitIndex = pSyncNode->commitIndex; + pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); + pSyncMsg->privateTerm = 0; + pSyncMsg->timeStamp = taosGetTimestampMs(); + + // update reset time + int64_t timerElapsed = tsNow - pSyncTimer->timeStamp; + pSyncTimer->timeStamp = tsNow; + char logBuf[64]; + snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64 ", next-exec:%" PRId64, timerElapsed, + pData->execTime); + + // send msg + syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf); + } else { +#if 0 + sTrace( + "vgId:%d, hbDataRid:%ld, pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------", + pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime); +#endif + } + if (syncIsInit()) { // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId); - taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid, syncEnv()->pTimerManager, - &pSyncTimer->pTimer); + taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid, + syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("sync env is stop, reset peer hb timer error"); } - SRpcMsg rpcMsg = {0}; - (void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId); - - SyncHeartbeat* pSyncMsg = rpcMsg.pCont; - pSyncMsg->srcId = pSyncNode->myRaftId; - pSyncMsg->destId = pData->destId; - pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; - pSyncMsg->commitIndex = pSyncNode->commitIndex; - pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); - pSyncMsg->privateTerm = 0; - pSyncMsg->timeStamp = taosGetTimestampMs(); - - // update reset time - int64_t tsNow = taosGetTimestampMs(); - int64_t timerElapsed = tsNow - pSyncTimer->timeStamp; - pSyncTimer->timeStamp = tsNow; - char logBuf[64]; - snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64, timerElapsed); - - // send msg - syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf); - } else { sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId, timerLogicClock, msgLogicClock); -- GitLab