提交 9be864a6 编写于 作者: M Minghao Li

refactor(sync): make leader life longer

上级 998e2447
......@@ -448,7 +448,8 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
}
vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), code);
vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
code);
syncNodeRelease(pSyncNode);
if (code != 0 && terrno == 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
......@@ -629,8 +630,8 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex);
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
......@@ -707,7 +708,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
}
setPingTimerMS(pVnode->sync, 5000);
setElectTimerMS(pVnode->sync, 1300);
setElectTimerMS(pVnode->sync, 2800);
setHeartbeatTimerMS(pVnode->sync, 900);
return 0;
}
......
......@@ -28,13 +28,13 @@ extern "C" {
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 5000
#define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 5000
#define ELECT_TIMER_MS_MIN 5000
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 900
#define HEARTBEAT_TIMER_MS 900
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
......
......@@ -730,7 +730,8 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs
for (int i = 0; i < arrSize; ++i) {
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "propose message, type:%s batch:%d", TMSG_INFO(pMsgPArr[i]->msgType), arrSize);
snprintf(eventLog, sizeof(eventLog), "propose message, type:%s batch:%d", TMSG_INFO(pMsgPArr[i]->msgType),
arrSize);
syncNodeEventLog(pSyncNode, eventLog);
} while (0);
......@@ -834,7 +835,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
rpcFreeCont(rpcMsg.pCont);
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
ret = 1;
sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType));
sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType));
} else {
ret = -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
......@@ -1114,7 +1116,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
}
int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode);
// ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0);
}
......@@ -1250,6 +1252,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer);
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
} else {
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
}
......@@ -1281,6 +1290,14 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
}
ret = syncNodeRestartElectTimer(pSyncNode, electMS);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine,
2 * pSyncNode->electBaseLine, electMS);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
return ret;
}
......@@ -1293,6 +1310,13 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
} else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
return ret;
}
......@@ -1304,6 +1328,13 @@ int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) {
} else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", 1);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
return ret;
}
......@@ -1312,6 +1343,8 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
taosTmrStop(pSyncNode->pHeartbeatTimer);
pSyncNode->pHeartbeatTimer = NULL;
sTrace("vgId:%d, stop heartbeat timer", pSyncNode->vgId);
return ret;
}
......@@ -1559,12 +1592,13 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
", sby:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, %s",
"lcfg:%" PRId64 ", chging:%d, rsto:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing,
pSyncNode->restoreFinish, printStr);
pSyncNode->restoreFinish, pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser,
printStr);
} else {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
......@@ -1894,7 +1928,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
#if 0 // simon
#if 0 // simon
syncNodeReplicate(pSyncNode);
#endif
syncMaybeAdvanceCommitIndex(pSyncNode);
......
......@@ -141,7 +141,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
", match-index:%d, raftid:%" PRId64,
pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
syncNodeRestartNowHeartbeatTimer(pSyncNode);
// syncNodeRestartNowHeartbeatTimer(pSyncNode);
syncNodeStartNowHeartbeatTimer(pSyncNode);
return -1;
}
......
......@@ -48,14 +48,16 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->electTimerCounter);
sInfo("vgId:%d, sync timeout, type:election count:%d", ths->vgId, ths->electTimerCounter);
sInfo("vgId:%d, sync timeout, type:election count:%d, electTimerLogicClockUser:%ld", ths->vgId,
ths->electTimerCounter, ths->electTimerLogicClockUser);
syncNodeElect(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->heartbeatTimerCounter);
sInfo("vgId:%d, sync timeout, type:replicate count:%d", ths->vgId, ths->heartbeatTimerCounter);
sInfo("vgId:%d, sync timeout, type:replicate count:%d, heartbeatTimerLogicClockUser:%ld", ths->vgId,
ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser);
syncNodeReplicate(ths);
}
} else {
......
......@@ -125,7 +125,10 @@ int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
ASSERT(min > 0 && max > 0 && max >= min);
return min + syncUtilRand(max - min);
int32_t rdm = min + syncUtilRand(max - min);
// sDebug("random min:%d, max:%d, rdm:%d", min, max, rdm);
return rdm;
}
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册