提交 b7e22154 编写于 作者: M Minghao Li

sync timeout

上级 b75da82c
......@@ -121,14 +121,17 @@ typedef struct SSyncNode {
// init internal
SNodeInfo me;
SRaftId raftId;
int32_t peersNum;
SNodeInfo peers[TSDB_MAX_REPLICA];
SRaftId peersId[TSDB_MAX_REPLICA];
int32_t replicaNum;
SRaftId replicasId[TSDB_MAX_REPLICA];
// raft algorithm
SSyncFSM* pFsm;
SRaftId raftId;
SRaftId peersId[TSDB_MAX_REPLICA];
int32_t replicaNum;
int32_t quorum;
// life cycle
......
......@@ -39,8 +39,9 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
// ---- SSyncBuffer ----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len);
void syncUtilbufDestroy(SSyncBuffer* syncBuf);
......@@ -48,7 +49,6 @@ void syncUtilbufDestroy(SSyncBuffer* syncBuf);
void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest);
void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
#endif
#ifdef __cplusplus
}
......
......@@ -24,13 +24,36 @@ extern "C" {
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "taosdef.h"
typedef struct SVotesGranted {
SyncTerm term;
int32_t quorum;
int32_t votes;
bool toLeader;
SSyncNode *pSyncNode;
} SVotesGranted;
typedef struct SVotesResponded {
} SVotesResponded;
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode);
void voteGrantedDestroy(SVotesGranted *pVotesGranted);
bool voteGrantedMajority(SVotesGranted *pVotesGranted);
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
typedef struct SVotesRespond {
SRaftId (*replicas)[TSDB_MAX_REPLICA];
bool isRespond[TSDB_MAX_REPLICA];
int32_t replicaNum;
SyncTerm term;
SSyncNode *pSyncNode;
} SVotesRespond;
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode);
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
void Reset(SVotesRespond *pVotesRespond, SyncTerm term);
#ifdef __cplusplus
}
......
......@@ -25,7 +25,6 @@ static int32_t tsNodeRefId = -1;
// ------ local funciton ---------
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static void syncNodePingTimerCb(void* param, void* tmrId);
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
......@@ -358,25 +357,6 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
return ret;
}
static void syncNodePingTimerCb(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
++(pSyncNode->pingTimerCounter);
sTrace(
"syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, "
"tmrId:%p ",
pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);
syncNodePingAll(pSyncNode);
taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
} else {
sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
}
}
static void syncNodeEqPingTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
......
......@@ -68,8 +68,12 @@ void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaft
raftId->vgId = vgId;
}
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId;
return ret;
}
// ---- SSyncBuffer -----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) {
syncBuf->len = len;
syncBuf->data = malloc(syncBuf->len);
......@@ -87,4 +91,3 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
dest->data = malloc(dest->len);
memcpy(dest->data, src->data, dest->len);
}
#endif
\ No newline at end of file
......@@ -14,3 +14,83 @@
*/
#include "syncVoteMgr.h"
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted));
assert(pVotesGranted != NULL);
memset(pVotesGranted, 0, sizeof(SVotesGranted));
pVotesGranted->quorum = pSyncNode->quorum;
pVotesGranted->term = 0;
pVotesGranted->votes = 0;
pVotesGranted->toLeader = false;
pVotesGranted->pSyncNode = pSyncNode;
return pVotesGranted;
}
void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
if (pVotesGranted != NULL) {
free(pVotesGranted);
}
}
bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
return ret;
}
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
assert(pMsg->voteGranted == true);
assert(pMsg->term == pVotesGranted->term);
pVotesGranted->votes++;
}
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
pVotesGranted->term = term;
pVotesGranted->votes = 0;
pVotesGranted->toLeader = false;
}
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond));
assert(pVotesRespond != NULL);
memset(pVotesRespond, 0, sizeof(SVotesRespond));
pVotesRespond->replicas = &(pSyncNode->replicasId);
pVotesRespond->replicaNum = pSyncNode->replicaNum;
pVotesRespond->term = 0;
pVotesRespond->pSyncNode = pSyncNode;
return pVotesRespond;
}
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
bool ret = false;
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) {
ret = true;
break;
}
}
return ret;
}
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) {
assert(pVotesRespond->term == pMsg->term);
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
if (syncUtilSameId(&(*pVotesRespond->replicas)[i], &pMsg->srcId)) {
assert(pVotesRespond->isRespond[i] == false);
pVotesRespond->isRespond[i] = true;
return;
}
}
assert(0);
}
void Reset(SVotesRespond *pVotesRespond, SyncTerm term) {
pVotesRespond->term = term;
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
pVotesRespond->isRespond[i] = false;
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册