diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index aa8d3bef517908f402667629bfa023c2045e18d1..e882f7461d418844616424943cdb9bcd5f32ea66 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -70,6 +70,7 @@ typedef struct SSyncTimer { uint64_t logicClock; uint64_t counter; int32_t timerMS; + int64_t timeStamp; SRaftId destId; int64_t hbDataRid; } SSyncTimer; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 7ceec29be4d0c74b6b86b95e5f5a64ee9cc83fc2..6535f77fbec3406b624a77506ca0c401cb2af673 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -35,6 +35,7 @@ typedef struct SyncTimeout { ESyncTimeoutType timeoutType; uint64_t logicClock; int32_t timerMS; + int64_t timeStamp; void* data; // need optimized } SyncTimeout; diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 7da610a9edf626ea6de28234ad206ab1c712ec86..eae931d9898f5d3afa2ce37840be935852a9f35b 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -48,7 +48,7 @@ extern "C" { // /\ UNCHANGED <> int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode); -int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg); +int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg, const char* debugStr); int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 2aaa13f95d529447bdaf3542c1eae8b2b91d9e7b..b926cf2cf52b4bfc1f502bc295c842c2f67860bc 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -691,6 +691,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa pSyncTimer->timerMS = pSyncNode->hbBaseLine; pSyncTimer->timerCb = syncNodeEqPeerHeartbeatTimer; pSyncTimer->destId = destId; + pSyncTimer->timeStamp = taosGetTimestampMs(); atomic_store_64(&pSyncTimer->logicClock, 0); return 0; } @@ -704,6 +705,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { pData->rid = syncHbTimerDataAdd(pData); } pSyncTimer->hbDataRid = pData->rid; + pSyncTimer->timeStamp = taosGetTimestampMs(); pData->syncNodeRid = pSyncNode->rid; pData->pTimer = pSyncTimer; @@ -1897,7 +1899,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { return; } - sTrace("enqueue ping msg"); + // sTrace("enqueue ping msg"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { sError("failed to sync enqueue ping msg since %s", terrstr()); @@ -2041,8 +2043,15 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { pSyncMsg->privateTerm = 0; pSyncMsg->timeStamp = taosGetTimestampMs(); + // update reset time + int64_t tsNow = taosGetTimestampMs(); + int64_t timerElapsed = tsNow - pSyncTimer->timeStamp; + pSyncTimer->timeStamp = tsNow; + char logBuf[64]; + snprintf(logBuf, sizeof(logBuf), "timer-elapsed:%" PRId64, timerElapsed); + // send msg - syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); + syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, logBuf); } else { sTrace("vgId:%d, do not send hb, timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", pSyncNode->vgId, @@ -2151,8 +2160,9 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeat* pMsg = pRpcMsg->pCont; int64_t tsMs = taosGetTimestampMs(); + int64_t timeDiff = tsMs - pMsg->timeStamp; char buf[128]; - snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs); + snprintf(buf, sizeof(buf), "net elapsed:%" PRId64, timeDiff); syncLogRecvHeartbeat(ths, pMsg, buf); SRpcMsg rpcMsg = {0}; @@ -2229,8 +2239,9 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeatReply* pMsg = pRpcMsg->pCont; int64_t tsMs = taosGetTimestampMs(); + int64_t timeDiff = tsMs - pMsg->timeStamp; char buf[128]; - snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs); + snprintf(buf, sizeof(buf), "net elapsed:%" PRId64, timeDiff); syncLogRecvHeartbeatReply(ths, pMsg, buf); // update last reply time, make decision whether the other node is alive or not diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index ce984199803fa07512cb0aee57bc95126dd9f820..28a8a2e9954071933e358e750aa16fc7065c5374 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -35,6 +35,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l pTimeout->timeoutType = timeoutType; pTimeout->logicClock = logicClock; pTimeout->timerMS = timerMS; + pTimeout->timeStamp = taosGetTimestampMs(); pTimeout->data = pNode; return 0; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 54c29febe5624a8be0068ef1df635a0bee01ed73..27f6e855d661fdce2e036bc2c7600c96140b1178 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -207,8 +207,8 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest return ret; } -int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) { - syncLogSendHeartbeat(pSyncNode, pMsg->pCont, ""); +int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg, const char* debugStr) { + syncLogSendHeartbeat(pSyncNode, pMsg->pCont, debugStr); return syncNodeSendMsgById(destId, pSyncNode, pMsg); } @@ -231,7 +231,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { pSyncMsg->timeStamp = ts; // send msg - syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); + syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg, "x"); } return 0; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 1e5a268e9768060bc0fcddfdcae6f887f1610924..2908e7b945aed11cad23691e9a88dbb377514c51 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -396,8 +396,11 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df } void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { - sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s", - syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); + int64_t tsNow = taosGetTimestampMs(); + int64_t timeDIff = tsNow - pMsg->timeStamp; + sNTrace( + pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, ts:%" PRId64 ", elapsed:%" PRId64 ", data:%p}, %s", + syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->timeStamp, timeDIff, pMsg->data, s); } void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {