提交 28a5a9d7 编写于 作者: M Minghao Li

refactor(sync): add log, update quorum first when reconfig

上级 1df2703c
......@@ -157,6 +157,8 @@ typedef enum ESyncTimeoutType {
SYNC_TIMEOUT_HEARTBEAT,
} ESyncTimeoutType;
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
typedef struct SyncTimeout {
uint32_t bytes;
int32_t vgId;
......@@ -677,7 +679,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
// on message ----------------------
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
......
......@@ -503,7 +503,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
code = syncNodeOnTimer(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
......
......@@ -388,7 +388,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
code = syncNodeOnTimer(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
......
......@@ -232,12 +232,8 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimerNow(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimerNow(SSyncNode* pSyncNode);
int32_t syncNodeRestartNowHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms);
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
......@@ -313,6 +309,8 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm);
// trace log
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
......
......@@ -39,7 +39,7 @@ extern "C" {
// /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]]
// /\ UNCHANGED <<messages, leaderVars, logVars>>
//
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg);
#ifdef __cplusplus
}
......
......@@ -1174,7 +1174,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
pSyncNode->FpOnPing = syncNodeOnPingCb;
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
pSyncNode->FpOnClientRequest = syncNodeOnClientRequest;
pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
pSyncNode->FpOnTimeout = syncNodeOnTimer;
pSyncNode->FpOnSnapshot = syncNodeOnSnapshot;
pSyncNode->FpOnSnapshotReply = syncNodeOnSnapshotReply;
pSyncNode->FpOnRequestVote = syncNodeOnRequestVote;
......@@ -1259,7 +1259,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
syncNodeBecomeFollower(pSyncNode, "first start");
}
if (pSyncNode->vgId == 1) {
if (syncNodeIsMnode(pSyncNode)) {
int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0);
......@@ -1486,47 +1486,32 @@ static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
if (syncNodeIsMnode(pSyncNode)) {
pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
} else {
do {
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
syncHbTimerStart(pSyncNode, pSyncTimer);
}
} while (0);
}
return ret;
}
#if 0
pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
#endif
int32_t syncNodeStartHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) {
pSyncNode->heartbeatTimerMS = ms;
int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
return ret;
}
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
syncHbTimerStart(pSyncNode, pSyncTimer);
}
int32_t syncNodeStartHeartbeatTimerNow(SSyncNode* pSyncNode) {
pSyncNode->heartbeatTimerMS = 1;
int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
return ret;
}
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
#if 0
atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
taosTmrStop(pSyncNode->pHeartbeatTimer);
pSyncNode->pHeartbeatTimer = NULL;
#endif
sTrace("vgId:%d, sync %s stop heartbeat timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
do {
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
syncHbTimerStop(pSyncNode, pSyncTimer);
}
} while (0);
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
syncHbTimerStop(pSyncNode, pSyncTimer);
}
return ret;
}
......@@ -1537,18 +1522,6 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
return 0;
}
int32_t syncNodeRestartHeartbeatTimerNow(SSyncNode* pSyncNode) {
syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeStartHeartbeatTimerNow(pSyncNode);
return 0;
}
int32_t syncNodeRestartNowHeartbeatTimerMS(SSyncNode* pSyncNode, int32_t ms) {
syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeStartHeartbeatTimerMS(pSyncNode, ms);
return 0;
}
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
......@@ -2025,13 +1998,14 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
}
// update quorum first
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
// reset snapshot senders
// clear new
......@@ -3355,6 +3329,25 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
return true;
}
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
if (timerType == SYNC_TIMEOUT_PING) {
return "ping";
} else if (timerType == SYNC_TIMEOUT_ELECTION) {
return "elect";
} else if (timerType == SYNC_TIMEOUT_HEARTBEAT) {
return "heartbeat";
} else {
return "unknown";
}
}
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-timer {type:%s, lc:%lu, ms:%d, data:%p}, %s",
syncTimerTypeStr(pMsg->timeoutType), s, pMsg->logicClock, pMsg->timerMS, pMsg->data);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char host[64];
uint16_t port;
......
......@@ -60,12 +60,12 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) {
int32_t syncNodeTimerRoutine(SSyncNode* ths) {
syncNodeEventLog(ths, "timer routines");
if (ths->vgId == 1) {
if (syncNodeIsMnode(ths)) {
syncNodeCleanConfigIndex(ths);
}
#if 0
if (ths->vgId != 1) {
if (!syncNodeIsMnode(ths)) {
syncRespClean(ths->pSyncRespMgr);
}
#endif
......@@ -73,9 +73,9 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) {
return 0;
}
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0;
syncTimeoutLog2("==syncNodeOnTimeoutCb==", pMsg);
syncLogRecvTimer(ths, pMsg, "");
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
......@@ -84,28 +84,29 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
// syncNodePingAll(ths);
// syncNodePingPeers(ths);
// sTrace("vgId:%d, sync timeout, type:ping count:%d", ths->vgId, ths->pingTimerCounter);
syncNodeTimerRoutine(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->electTimerCounter);
sTrace("vgId:%d, sync timer, type:election count:%d, electTimerLogicClockUser:%ld", ths->vgId,
ths->electTimerCounter, ths->electTimerLogicClockUser);
sTrace("vgId:%d, sync timer, type:election count:%d, lc-user:%ld", ths->vgId, ths->electTimerCounter,
ths->electTimerLogicClockUser);
syncNodeElect(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->heartbeatTimerCounter);
sTrace("vgId:%d, sync timer, type:replicate count:%d, heartbeatTimerLogicClockUser:%ld", ths->vgId,
ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser);
sTrace("vgId:%d, sync timer, type:replicate count:%d, lc-user:%ld", ths->vgId, ths->heartbeatTimerCounter,
ths->heartbeatTimerLogicClockUser);
// syncNodeReplicate(ths, true);
}
} else {
sError("vgId:%d, unknown timeout-type:%d", ths->vgId, pMsg->timeoutType);
sError("vgId:%d, recv unknown timer-type:%d", ths->vgId, pMsg->timeoutType);
}
return ret;
......
......@@ -130,7 +130,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char *voteGranted2Str(SVotesGranted *pVotesGranted) {
cJSON *pJson = voteGranted2Json(pVotesGranted);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -259,7 +259,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char *votesRespond2Str(SVotesRespond *pVotesRespond) {
cJSON *pJson = votesRespond2Json(pVotesRespond);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册