未验证 提交 29456c06 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #17649 from taosdata/feature/sync2-merge

refactor(sync): adjust elect timer
......@@ -79,6 +79,12 @@ typedef struct SSyncTimer {
void* pData;
} SSyncTimer;
typedef struct SElectTimer {
uint64_t logicClock;
SSyncNode* pSyncNode;
void* pData;
} SElectTimer;
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
......@@ -155,7 +161,6 @@ typedef struct SSyncNode {
tmr_h pElectTimer;
int32_t electTimerMS;
uint64_t electTimerLogicClock;
uint64_t electTimerLogicClockUser;
TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp
uint64_t electTimerCounter;
......
......@@ -1310,7 +1310,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->pElectTimer = NULL;
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
atomic_store_64(&pSyncNode->electTimerLogicClock, 0);
atomic_store_64(&pSyncNode->electTimerLogicClockUser, 0);
pSyncNode->FpElectTimerCB = syncNodeEqElectTimer;
pSyncNode->electTimerCounter = 0;
......@@ -1565,17 +1564,14 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
int32_t ret = 0;
if (syncEnvIsStart()) {
pSyncNode->electTimerMS = ms;
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer);
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
/*
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
*/
SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
pElectTimer->pSyncNode = pSyncNode;
pElectTimer->pData = NULL;
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer);
} else {
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
......@@ -1585,11 +1581,10 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
atomic_add_fetch_64(&pSyncNode->electTimerLogicClockUser, 1);
atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
taosTmrStop(pSyncNode->pElectTimer);
pSyncNode->pElectTimer = NULL;
// sTrace("vgId:%d, sync %s stop elect timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
return ret;
}
......@@ -1815,8 +1810,6 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClock);
cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClockUser);
cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB);
cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerCounter);
......@@ -1922,7 +1915,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
} else {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
......@@ -1946,7 +1939,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, peerStateStr, printStr);
} else {
snprintf(s, len, "%s", str);
}
......@@ -2000,7 +1993,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
} else {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
......@@ -2022,7 +2015,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
pSyncNode->electTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser, printStr);
} else {
snprintf(s, len, "%s", str);
}
......@@ -2789,38 +2782,46 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
}
static void syncNodeEqElectTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_64(&pSyncNode->electTimerLogicClockUser) <= atomic_load_64(&pSyncNode->electTimerLogicClock)) {
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, atomic_load_64(&pSyncNode->electTimerLogicClock),
pSyncNode->electTimerMS, pSyncNode->vgId, pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) {
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg);
return;
}
} else {
sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
SElectTimer* pElectTimer = (SElectTimer*)param;
SSyncNode* pSyncNode = pElectTimer->pSyncNode;
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pSyncNode->electTimerMS,
pSyncNode->vgId, pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) {
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg);
taosMemoryFree(pElectTimer);
return;
}
syncTimeoutDestroy(pSyncMsg);
// reset timer ms
if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) {
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer);
} else {
sError("sync env is stop, syncNodeEqElectTimer");
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "eq elect timer lc:%" PRIu64, pSyncMsg->logicClock);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
} else {
sTrace("syncNodeEqElectTimer FpEqMsg is NULL");
}
syncTimeoutDestroy(pSyncMsg);
taosMemoryFree(pElectTimer);
#if 0
// reset timer ms
if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) {
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer);
} else {
sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
sError("sync env is stop, syncNodeEqElectTimer");
}
#endif
}
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
......@@ -3000,16 +3001,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
// on message ----
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
// log state
char logBuf[1024] = {0};
snprintf(logBuf, sizeof(logBuf),
"==syncNodeOnPingCb== vgId:%d, state: %d, %s, term:%" PRIu64 " electTimerLogicClock:%" PRIu64
", "
"electTimerLogicClockUser:%" PRIu64 ", electTimerMS:%d",
ths->vgId, ths->state, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm,
ths->electTimerLogicClock, ths->electTimerLogicClockUser, ths->electTimerMS);
int32_t ret = 0;
syncPingLog2(logBuf, pMsg);
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId);
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
......@@ -3024,7 +3015,7 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
syncPingReplyDestroy(pMsgReply);
return ret;
return 0;
}
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
......
......@@ -113,10 +113,8 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) {
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) {
if (atomic_load_64(&ths->electTimerLogicClock) <= pMsg->logicClock) {
++(ths->electTimerCounter);
sTrace("vgId:%d, sync timer, type:election count:%" PRIu64 ", lc-user:%" PRIu64, ths->vgId,
ths->electTimerCounter, ths->electTimerLogicClockUser);
syncNodeElect(ths);
}
......
......@@ -66,7 +66,12 @@ bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
ASSERT(pMsg->voteGranted == true);
ASSERT(pMsg->term == pVotesGranted->term);
if (pMsg->term != pVotesGranted->term) {
syncNodeEventLog(pVotesGranted->pSyncNode, "vote grant vnode error");
return;
}
ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId));
int j = -1;
......@@ -201,7 +206,11 @@ bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
}
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) {
ASSERT(pVotesRespond->term == pMsg->term);
if (pVotesRespond->term != pMsg->term) {
syncNodeEventLog(pVotesRespond->pSyncNode, "vote respond add error");
return;
}
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) {
// ASSERT(pVotesRespond->isRespond[i] == false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册