From 28a5a9d7e117d86d684af45e0b58f6c381a1b36e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 19 Oct 2022 10:57:50 +0800 Subject: [PATCH] refactor(sync): add log, update quorum first when reconfig --- include/libs/sync/syncTools.h | 4 +- source/dnode/mnode/impl/src/mndMain.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- source/libs/sync/inc/syncInt.h | 6 +- source/libs/sync/inc/syncTimeout.h | 2 +- source/libs/sync/src/syncMain.c | 85 ++++++++++++-------------- source/libs/sync/src/syncTimeout.c | 21 ++++--- source/libs/sync/src/syncVoteMgr.c | 4 +- 8 files changed, 60 insertions(+), 66 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 92e1e47d0a..07b2da1632 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -157,6 +157,8 @@ typedef enum ESyncTimeoutType { SYNC_TIMEOUT_HEARTBEAT, } ESyncTimeoutType; +const char* syncTimerTypeStr(enum ESyncTimeoutType timerType); + typedef struct SyncTimeout { uint32_t bytes; int32_t vgId; @@ -677,7 +679,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); // on message ---------------------- int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); +int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 25c194ce75..acaec3e2e8 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -503,7 +503,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); - code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + code = syncNodeOnTimer(pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_PING) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 5501f261df..e703164af4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -388,7 +388,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); - code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + code = syncNodeOnTimer(pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_PING) { diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3c3fbc7574..29d0d6ccc0 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -232,12 +232,8 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); -int32_t syncNodeStartHeartbeatTimerNow(SSyncNode* pSyncNode); -int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); -int32_t syncNodeRestartHeartbeatTimerNow(SSyncNode* pSyncNode); -int32_t syncNodeRestartNowHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms); // utils -------------- int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); @@ -313,6 +309,8 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm); // trace log +void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); + void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index 25c26c909d..112a3d8610 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -39,7 +39,7 @@ extern "C" { // /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]] // /\ UNCHANGED <> // -int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); +int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4325503622..c8609be23b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1174,7 +1174,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; pSyncNode->FpOnClientRequest = syncNodeOnClientRequest; - pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb; + pSyncNode->FpOnTimeout = syncNodeOnTimer; pSyncNode->FpOnSnapshot = syncNodeOnSnapshot; pSyncNode->FpOnSnapshotReply = syncNodeOnSnapshotReply; pSyncNode->FpOnRequestVote = syncNodeOnRequestVote; @@ -1259,7 +1259,7 @@ void syncNodeStart(SSyncNode* pSyncNode) { syncNodeBecomeFollower(pSyncNode, "first start"); } - if (pSyncNode->vgId == 1) { + if (syncNodeIsMnode(pSyncNode)) { int32_t ret = 0; ret = syncNodeStartPingTimer(pSyncNode); ASSERT(ret == 0); @@ -1486,47 +1486,32 @@ static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - if (syncNodeIsMnode(pSyncNode)) { - pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; - ret = syncNodeDoStartHeartbeatTimer(pSyncNode); - } else { - do { - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); - syncHbTimerStart(pSyncNode, pSyncTimer); - } - } while (0); - } - - return ret; -} +#if 0 + pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; + ret = syncNodeDoStartHeartbeatTimer(pSyncNode); +#endif -int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) { - pSyncNode->heartbeatTimerMS = ms; - int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode); - return ret; -} + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); + syncHbTimerStart(pSyncNode, pSyncTimer); + } -int32_t syncNodeStartHeartbeatTimerNow(SSyncNode* pSyncNode) { - pSyncNode->heartbeatTimerMS = 1; - int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode); return ret; } int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; + +#if 0 atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); taosTmrStop(pSyncNode->pHeartbeatTimer); pSyncNode->pHeartbeatTimer = NULL; +#endif - sTrace("vgId:%d, sync %s stop heartbeat timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state)); - - do { - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); - syncHbTimerStop(pSyncNode, pSyncTimer); - } - } while (0); + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); + syncHbTimerStop(pSyncNode, pSyncTimer); + } return ret; } @@ -1537,18 +1522,6 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeRestartHeartbeatTimerNow(SSyncNode* pSyncNode) { - syncNodeStopHeartbeatTimer(pSyncNode); - syncNodeStartHeartbeatTimerNow(pSyncNode); - return 0; -} - -int32_t syncNodeRestartNowHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) { - syncNodeStopHeartbeatTimer(pSyncNode); - syncNodeStartHeartbeatTimerMS(pSyncNode, ms); - return 0; -} - // utils -------------- int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; @@ -2025,13 +1998,14 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); } + // update quorum first + pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); + syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); - pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); - // reset snapshot senders // clear new @@ -3355,6 +3329,25 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { return true; } +const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) { + if (timerType == SYNC_TIMEOUT_PING) { + return "ping"; + } else if (timerType == SYNC_TIMEOUT_ELECTION) { + return "elect"; + } else if (timerType == SYNC_TIMEOUT_HEARTBEAT) { + return "heartbeat"; + } else { + return "unknown"; + } +} + +void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv sync-timer {type:%s, lc:%lu, ms:%d, data:%p}, %s", + syncTimerTypeStr(pMsg->timeoutType), s, pMsg->logicClock, pMsg->timerMS, pMsg->data); + syncNodeEventLog(pSyncNode, logBuf); +} + void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { char host[64]; uint16_t port; diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index ed09922b9e..62a81133f3 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -60,12 +60,12 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) { int32_t syncNodeTimerRoutine(SSyncNode* ths) { syncNodeEventLog(ths, "timer routines"); - if (ths->vgId == 1) { + if (syncNodeIsMnode(ths)) { syncNodeCleanConfigIndex(ths); } #if 0 - if (ths->vgId != 1) { + if (!syncNodeIsMnode(ths)) { syncRespClean(ths->pSyncRespMgr); } #endif @@ -73,9 +73,9 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) { return 0; } -int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { +int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0; - syncTimeoutLog2("==syncNodeOnTimeoutCb==", pMsg); + syncLogRecvTimer(ths, pMsg, ""); if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { @@ -84,28 +84,29 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { // syncNodePingAll(ths); // syncNodePingPeers(ths); - // sTrace("vgId:%d, sync timeout, type:ping count:%d", ths->vgId, ths->pingTimerCounter); syncNodeTimerRoutine(ths); } } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->electTimerCounter); - sTrace("vgId:%d, sync timer, type:election count:%d, electTimerLogicClockUser:%ld", ths->vgId, - ths->electTimerCounter, ths->electTimerLogicClockUser); + sTrace("vgId:%d, sync timer, type:election count:%d, lc-user:%ld", ths->vgId, ths->electTimerCounter, + ths->electTimerLogicClockUser); + syncNodeElect(ths); } } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->heartbeatTimerCounter); - sTrace("vgId:%d, sync timer, type:replicate count:%d, heartbeatTimerLogicClockUser:%ld", ths->vgId, - ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser); + sTrace("vgId:%d, sync timer, type:replicate count:%d, lc-user:%ld", ths->vgId, ths->heartbeatTimerCounter, + ths->heartbeatTimerLogicClockUser); + // syncNodeReplicate(ths, true); } } else { - sError("vgId:%d, unknown timeout-type:%d", ths->vgId, pMsg->timeoutType); + sError("vgId:%d, recv unknown timer-type:%d", ths->vgId, pMsg->timeoutType); } return ret; diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 641bb32d2d..09b79825d0 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -130,7 +130,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) { char *voteGranted2Str(SVotesGranted *pVotesGranted) { cJSON *pJson = voteGranted2Json(pVotesGranted); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -259,7 +259,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) { char *votesRespond2Str(SVotesRespond *pVotesRespond) { cJSON *pJson = votesRespond2Json(pVotesRespond); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } -- GitLab