From 9be864a6bed6e68707c0be2e65dc18fb70efd716 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 4 Aug 2022 20:36:12 +0800 Subject: [PATCH] refactor(sync): make leader life longer --- source/dnode/vnode/src/vnd/vnodeSync.c | 9 ++--- source/libs/sync/inc/syncEnv.h | 12 +++---- source/libs/sync/src/syncMain.c | 46 ++++++++++++++++++++++---- source/libs/sync/src/syncReplication.c | 3 +- source/libs/sync/src/syncTimeout.c | 6 ++-- source/libs/sync/src/syncUtil.c | 5 ++- 6 files changed, 61 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f3550bf2ee..60b00ae2e0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -448,7 +448,8 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } } - vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), code); + vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), + code); syncNodeRelease(pSyncNode); if (code != 0 && terrno == 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; @@ -629,8 +630,8 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; - vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply, - pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex); + vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, + pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex); int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code); @@ -707,7 +708,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { } setPingTimerMS(pVnode->sync, 5000); - setElectTimerMS(pVnode->sync, 1300); + setElectTimerMS(pVnode->sync, 2800); setHeartbeatTimerMS(pVnode->sync, 900); return 0; } diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index dd032f1481..e6d2bd4920 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -28,13 +28,13 @@ extern "C" { #include "trpc.h" #include "ttimer.h" -#define TIMER_MAX_MS 0x7FFFFFFF -#define ENV_TICK_TIMER_MS 1000 -#define PING_TIMER_MS 5000 -#define ELECT_TIMER_MS_MIN 1300 -#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) +#define TIMER_MAX_MS 0x7FFFFFFF +#define ENV_TICK_TIMER_MS 1000 +#define PING_TIMER_MS 5000 +#define ELECT_TIMER_MS_MIN 5000 +#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) -#define HEARTBEAT_TIMER_MS 900 +#define HEARTBEAT_TIMER_MS 900 #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f6f1f4e032..37d0dff095 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -730,7 +730,8 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs for (int i = 0; i < arrSize; ++i) { do { char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "propose message, type:%s batch:%d", TMSG_INFO(pMsgPArr[i]->msgType), arrSize); + snprintf(eventLog, sizeof(eventLog), "propose message, type:%s batch:%d", TMSG_INFO(pMsgPArr[i]->msgType), + arrSize); syncNodeEventLog(pSyncNode, eventLog); } while (0); @@ -834,7 +835,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { rpcFreeCont(rpcMsg.pCont); syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); ret = 1; - sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType)); + sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, + TMSG_INFO(pMsg->msgType)); } else { ret = -1; terrno = TSDB_CODE_SYN_INTERNAL_ERROR; @@ -1114,7 +1116,7 @@ void syncNodeStart(SSyncNode* pSyncNode) { } int32_t ret = 0; - ret = syncNodeStartPingTimer(pSyncNode); + // ret = syncNodeStartPingTimer(pSyncNode); ASSERT(ret == 0); } @@ -1250,6 +1252,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pElectTimer); atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + } else { sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); } @@ -1281,6 +1290,14 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); } ret = syncNodeRestartElectTimer(pSyncNode, electMS); + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, + 2 * pSyncNode->electBaseLine, electMS); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + return ret; } @@ -1293,6 +1310,13 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { } else { sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); } + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + return ret; } @@ -1304,6 +1328,13 @@ int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) { } else { sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); } + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", 1); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + return ret; } @@ -1312,6 +1343,8 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); taosTmrStop(pSyncNode->pHeartbeatTimer); pSyncNode->pHeartbeatTimer = NULL; + sTrace("vgId:%d, stop heartbeat timer", pSyncNode->vgId); + return ret; } @@ -1559,12 +1592,13 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ", sby:%d, " "stgy:%d, bch:%d, " "r-num:%d, " - "lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", + "lcfg:%" PRId64 ", chging:%d, rsto:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, - pSyncNode->restoreFinish, printStr); + pSyncNode->restoreFinish, pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, + printStr); } else { snprintf(logBuf, sizeof(logBuf), "%s", str); } @@ -1894,7 +1928,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // Raft 3.6.2 Committing entries from previous terms syncNodeAppendNoop(pSyncNode); -#if 0 // simon +#if 0 // simon syncNodeReplicate(pSyncNode); #endif syncMaybeAdvanceCommitIndex(pSyncNode); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 12d6797349..4f2dbcae67 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -141,7 +141,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { ", match-index:%d, raftid:%" PRId64, pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); - syncNodeRestartNowHeartbeatTimer(pSyncNode); + // syncNodeRestartNowHeartbeatTimer(pSyncNode); + syncNodeStartNowHeartbeatTimer(pSyncNode); return -1; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 4ae803d744..15b1054e63 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -48,14 +48,16 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->electTimerCounter); - sInfo("vgId:%d, sync timeout, type:election count:%d", ths->vgId, ths->electTimerCounter); + sInfo("vgId:%d, sync timeout, type:election count:%d, electTimerLogicClockUser:%ld", ths->vgId, + ths->electTimerCounter, ths->electTimerLogicClockUser); syncNodeElect(ths); } } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->heartbeatTimerCounter); - sInfo("vgId:%d, sync timeout, type:replicate count:%d", ths->vgId, ths->heartbeatTimerCounter); + sInfo("vgId:%d, sync timeout, type:replicate count:%d, heartbeatTimerLogicClockUser:%ld", ths->vgId, + ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser); syncNodeReplicate(ths); } } else { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 5b73d980c4..15e94baee4 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -125,7 +125,10 @@ int32_t syncUtilRand(int32_t max) { return taosRand() % max; } int32_t syncUtilElectRandomMS(int32_t min, int32_t max) { ASSERT(min > 0 && max > 0 && max >= min); - return min + syncUtilRand(max - min); + int32_t rdm = min + syncUtilRand(max - min); + + // sDebug("random min:%d, max:%d, rdm:%d", min, max, rdm); + return rdm; } int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; } -- GitLab