提交 237d719d 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11463-3.0

...@@ -29,6 +29,7 @@ extern "C" { ...@@ -29,6 +29,7 @@ extern "C" {
#include "ttimer.h" #include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF #define TIMER_MAX_MS 0x7FFFFFFF
#define PING_TIMER_MS 1000
typedef struct SSyncEnv { typedef struct SSyncEnv {
tmr_h pEnvTickTimer; tmr_h pEnvTickTimer;
......
...@@ -131,6 +131,7 @@ typedef struct SSyncNode { ...@@ -131,6 +131,7 @@ typedef struct SSyncNode {
// raft algorithm // raft algorithm
SSyncFSM* pFsm; SSyncFSM* pFsm;
int32_t quorum; int32_t quorum;
SRaftId leaderCache;
// life cycle // life cycle
int32_t refCount; int32_t refCount;
...@@ -155,7 +156,9 @@ typedef struct SSyncNode { ...@@ -155,7 +156,9 @@ typedef struct SSyncNode {
// timer // timer
tmr_h pPingTimer; tmr_h pPingTimer;
int32_t pingTimerMS; int32_t pingTimerMS;
uint8_t pingTimerEnable; // uint8_t pingTimerEnable;
uint64_t pingTimerLogicClock;
uint64_t pingTimerLogicClockUser;
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp
uint64_t pingTimerCounter; uint64_t pingTimerCounter;
...@@ -193,18 +196,16 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); ...@@ -193,18 +196,16 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
void syncNodeBecomeFollower(SSyncNode* pSyncNode); int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
void syncNodeBecomeLeader(SSyncNode* pSyncNode); int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);
void syncNodeCandidate2Leader(SSyncNode* pSyncNode); int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
void syncNodeLeader2Follower(SSyncNode* pSyncNode); int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -30,7 +30,7 @@ extern "C" { ...@@ -30,7 +30,7 @@ extern "C" {
// encode as uint32 // encode as uint32
typedef enum ESyncMessageType { typedef enum ESyncMessageType {
SYNC_UNKNOWN = 77, SYNC_UNKNOWN = 9999,
SYNC_TIMEOUT = 99, SYNC_TIMEOUT = 99,
SYNC_PING = 101, SYNC_PING = 101,
SYNC_PING_REPLY = 103, SYNC_PING_REPLY = 103,
...@@ -52,13 +52,13 @@ typedef enum ESyncTimeoutType { ...@@ -52,13 +52,13 @@ typedef enum ESyncTimeoutType {
SYNC_TIMEOUT_PING = 100, SYNC_TIMEOUT_PING = 100,
SYNC_TIMEOUT_ELECTION, SYNC_TIMEOUT_ELECTION,
SYNC_TIMEOUT_HEARTBEAT, SYNC_TIMEOUT_HEARTBEAT,
} ESyncTimeoutType; } ESyncTimeoutType;
typedef struct SyncTimeout { typedef struct SyncTimeout {
uint32_t bytes; uint32_t bytes;
uint32_t msgType; uint32_t msgType;
ESyncTimeoutType timeoutType; ESyncTimeoutType timeoutType;
uint64_t logicClock;
void* data; void* data;
} SyncTimeout; } SyncTimeout;
...@@ -69,7 +69,7 @@ void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* ...@@ -69,7 +69,7 @@ void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout*
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
cJSON* syncTimeout2Json(const SyncTimeout* pMsg); cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data); SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data);
// --------------------------------------------- // ---------------------------------------------
typedef struct SyncPing { typedef struct SyncPing {
......
...@@ -28,10 +28,6 @@ extern "C" { ...@@ -28,10 +28,6 @@ extern "C" {
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg);
void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,8 +28,6 @@ extern "C" { ...@@ -28,8 +28,6 @@ extern "C" {
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
// TLA+ Spec // TLA+ Spec
// AppendEntries(i, j) == // AppendEntries(i, j) ==
// /\ i /= j // /\ i /= j
...@@ -42,7 +42,7 @@ void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { ...@@ -42,7 +42,7 @@ void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) {
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
} }
void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// TLA+ Spec // TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) == // HandleAppendEntriesRequest(i, j, m) ==
// LET logOk == \/ m.mprevLogIndex = 0 // LET logOk == \/ m.mprevLogIndex = 0
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
// TLA+ Spec // TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) == // HandleAppendEntriesResponse(i, j, m) ==
// /\ m.mterm = currentTerm[i] // /\ m.mterm = currentTerm[i]
......
...@@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
pMsg->msgType, pMsg->contLen); pMsg->msgType, pMsg->contLen);
{ {
cJSON *pJson = syncRpcMsg2Json(pMsg); cJSON *pJson = syncRpcMsg2Json(pMsg);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
sTrace("process syncMessage send: pMsg:%s ", serialized); sTrace("process syncMessage send: pMsg:%s ", serialized);
free(serialized); free(serialized);
cJSON_Delete(pJson); cJSON_Delete(pJson);
...@@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param; SSyncIO *io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg; SRpcMsg * pRpcMsg, rpcMsg;
int type; int type;
qall = taosAllocateQall(); qall = taosAllocateQall();
...@@ -245,6 +245,42 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -245,6 +245,42 @@ static void *syncIOConsumerFunc(void *param) {
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} }
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
if (io->FpOnSyncRequestVote) {
SyncRequestVote *pSyncMsg;
pSyncMsg = syncRequestVoteBuild(pRpcMsg->contLen);
syncRequestVoteFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
if (io->FpOnSyncRequestVoteReply) {
SyncRequestVoteReply *pSyncMsg;
pSyncMsg = SyncRequestVoteReplyBuild();
syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
if (io->FpOnSyncAppendEntries) {
SyncAppendEntries *pSyncMsg;
pSyncMsg = syncAppendEntriesBuild(pRpcMsg->contLen);
syncAppendEntriesFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
if (io->FpOnSyncAppendEntriesReply) {
SyncAppendEntriesReply *pSyncMsg;
pSyncMsg = syncAppendEntriesReplyBuild();
syncAppendEntriesReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) { } else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
if (io->FpOnSyncTimeout != NULL) { if (io->FpOnSyncTimeout != NULL) {
SyncTimeout *pSyncMsg; SyncTimeout *pSyncMsg;
......
...@@ -31,16 +31,19 @@ static void syncNodeEqElectTimer(void* param, void* tmrId); ...@@ -31,16 +31,19 @@ static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
static void syncNodeLeader2Follower(SSyncNode* pSyncNode);
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
void syncNodeRequestVotePeers(SSyncNode* pSyncNode);
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
...@@ -96,8 +99,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -96,8 +99,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);
pSyncNode->pPingTimer = NULL; pSyncNode->pPingTimer = NULL;
pSyncNode->pingTimerMS = 1000; pSyncNode->pingTimerMS = PING_TIMER_MS;
atomic_store_8(&pSyncNode->pingTimerEnable, 0); atomic_store_64(&pSyncNode->pingTimerLogicClock, 0);
atomic_store_64(&pSyncNode->pingTimerLogicClockUser, 0);
pSyncNode->FpPingTimer = syncNodeEqPingTimer; pSyncNode->FpPingTimer = syncNodeEqPingTimer;
pSyncNode->pingTimerCounter = 0; pSyncNode->pingTimerCounter = 0;
...@@ -151,6 +155,9 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { ...@@ -151,6 +155,9 @@ void syncNodePingSelf(SSyncNode* pSyncNode) {
} }
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
pSyncNode->pingTimerMS = PING_TIMER_MS;
if (pSyncNode->pPingTimer == NULL) { if (pSyncNode->pPingTimer == NULL) {
pSyncNode->pPingTimer = pSyncNode->pPingTimer =
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager); taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
...@@ -159,12 +166,11 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { ...@@ -159,12 +166,11 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
} }
atomic_store_8(&pSyncNode->pingTimerEnable, 1);
return 0; return 0;
} }
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->pingTimerEnable, 0); atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
pSyncNode->pingTimerMS = TIMER_MAX_MS; pSyncNode->pingTimerMS = TIMER_MAX_MS;
return 0; return 0;
} }
...@@ -188,8 +194,6 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { ...@@ -188,8 +194,6 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
return 0; return 0;
} }
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pHeartbeatTimer == NULL) { if (pSyncNode->pHeartbeatTimer == NULL) {
pSyncNode->pHeartbeatTimer = pSyncNode->pHeartbeatTimer =
...@@ -209,20 +213,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { ...@@ -209,20 +213,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
return 0; return 0;
} }
int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }
void syncNodeBecomeFollower(SSyncNode* pSyncNode) {}
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {}
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {}
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}
// ------ local funciton --------- // ------ local funciton ---------
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
sTrace("syncNodePing pSyncNode:%p ", pSyncNode); sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
...@@ -252,16 +242,6 @@ static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, Syn ...@@ -252,16 +242,6 @@ static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, Syn
return ret; return ret;
} }
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet; SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet); syncUtilraftId2EpSet(destRaftId, &epSet);
...@@ -311,26 +291,6 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { ...@@ -311,26 +291,6 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
return ret; return ret;
} }
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0; int32_t ret = 0;
sTrace("<-- syncNodeOnTimeoutCb -->"); sTrace("<-- syncNodeOnTimeoutCb -->");
...@@ -344,7 +304,7 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ...@@ -344,7 +304,7 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
} }
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_8(&ths->pingTimerEnable)) { if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->pingTimerCounter); ++(ths->pingTimerCounter);
syncNodePingAll(ths); syncNodePingAll(ths);
} }
...@@ -359,10 +319,12 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ...@@ -359,10 +319,12 @@ static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqPingTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param; SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerEnable)) { if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
// pSyncNode->pingTimerMS += 100; // pSyncNode->pingTimerMS += 100;
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode); SyncTimeout* pSyncMsg =
syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), pSyncNode);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
...@@ -371,10 +333,49 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -371,10 +333,49 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
} else { } else {
sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock,
pSyncNode->pingTimerLogicClockUser);
} }
} }
static void syncNodeEqElectTimer(void* param, void* tmrId) {} static void syncNodeEqElectTimer(void* param, void* tmrId) {}
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}
static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
pSyncNode->leaderCache.addr = 0;
pSyncNode->leaderCache.vgId = 0;
}
syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeStartElectTimer(pSyncNode);
}
static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
pSyncNode->leaderCache = pSyncNode->raftId;
// next Index +=1
// match Index = 0;
syncNodeStopElectTimer(pSyncNode);
syncNodeStartHeartbeatTimer(pSyncNode);
// appendEntries;
}
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
}
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}
static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}
void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {}
\ No newline at end of file
...@@ -123,6 +123,8 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { ...@@ -123,6 +123,8 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock);
cJSON_AddStringToObject(pRoot, "logicClock", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
cJSON_AddStringToObject(pRoot, "data", u64buf); cJSON_AddStringToObject(pRoot, "data", u64buf);
...@@ -131,9 +133,10 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { ...@@ -131,9 +133,10 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
return pJson; return pJson;
} }
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) { SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, void* data) {
SyncTimeout* pMsg = syncTimeoutBuild(); SyncTimeout* pMsg = syncTimeoutBuild();
pMsg->timeoutType = timeoutType; pMsg->timeoutType = timeoutType;
pMsg->logicClock = logicClock;
pMsg->data = data; pMsg->data = data;
return pMsg; return pMsg;
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncRequestVote.h" #include "syncRequestVote.h"
void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
// TLA+ Spec // TLA+ Spec
// RequestVote(i, j) == // RequestVote(i, j) ==
// /\ state[i] = Candidate // /\ state[i] = Candidate
...@@ -29,7 +29,7 @@ void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { ...@@ -29,7 +29,7 @@ void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) {
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
} }
void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// TLA+ Spec // TLA+ Spec
// HandleRequestVoteRequest(i, j, m) == // HandleRequestVoteRequest(i, j, m) ==
// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncRequestVoteReply.h" #include "syncRequestVoteReply.h"
void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
// TLA+ Spec // TLA+ Spec
// HandleRequestVoteResponse(i, j, m) == // HandleRequestVoteResponse(i, j, m) ==
// \* This tallies votes even when the current state is not Candidate, but // \* This tallies votes even when the current state is not Candidate, but
......
...@@ -9,6 +9,7 @@ add_executable(syncIOSendMsgClientTest "") ...@@ -9,6 +9,7 @@ add_executable(syncIOSendMsgClientTest "")
add_executable(syncIOSendMsgServerTest "") add_executable(syncIOSendMsgServerTest "")
add_executable(syncRaftStoreTest "") add_executable(syncRaftStoreTest "")
add_executable(syncEnqTest "") add_executable(syncEnqTest "")
add_executable(syncIndexTest "")
target_sources(syncTest target_sources(syncTest
...@@ -55,6 +56,10 @@ target_sources(syncEnqTest ...@@ -55,6 +56,10 @@ target_sources(syncEnqTest
PRIVATE PRIVATE
"syncEnqTest.cpp" "syncEnqTest.cpp"
) )
target_sources(syncIndexTest
PRIVATE
"syncIndexTest.cpp"
)
target_include_directories(syncTest target_include_directories(syncTest
...@@ -112,6 +117,11 @@ target_include_directories(syncEnqTest ...@@ -112,6 +117,11 @@ target_include_directories(syncEnqTest
"${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncIndexTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest target_link_libraries(syncTest
...@@ -158,6 +168,10 @@ target_link_libraries(syncEnqTest ...@@ -158,6 +168,10 @@ target_link_libraries(syncEnqTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncIndexTest
sync
gtest_main
)
enable_testing() enable_testing()
......
#include <stdio.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
......
#include <stdio.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
......
#include <stdio.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
......
#include <stdio.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
......
#include <stdio.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void print(SHashObj *pNextIndex) {
printf("----------------\n");
uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL);
while (p) {
printf("%lu \n", *p);
p = (uint64_t *)taosHashIterate(pNextIndex, p);
}
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
sTrace("sync log test: trace");
sDebug("sync log test: debug");
sInfo("sync log test: info");
sWarn("sync log test: warn");
sError("sync log test: error");
sFatal("sync log test: fatal");
SRaftId me;
SRaftId peer1;
SRaftId peer2;
me.addr = 0;
me.vgId = 99;
peer1.addr = 1;
peer1.vgId = 99;
peer2.addr = 2;
peer2.vgId = 99;
uint64_t index;
SHashObj *pNextIndex =
taosHashInit(sizeof(SRaftId), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
index = 1000;
taosHashPut(pNextIndex, &me, sizeof(me), &index, sizeof(index));
index = 1001;
taosHashPut(pNextIndex, &peer1, sizeof(peer1), &index, sizeof(index));
index = 1002;
taosHashPut(pNextIndex, &peer2, sizeof(peer2), &index, sizeof(index));
print(pNextIndex);
SRaftId find;
find = peer1;
uint64_t *p;
p = (uint64_t *)taosHashGet(pNextIndex, &find, sizeof(find));
(*p) += 900;
print(pNextIndex);
taosHashCleanup(pNextIndex);
return 0;
}
...@@ -84,6 +84,17 @@ int main(int argc, char** argv) { ...@@ -84,6 +84,17 @@ int main(int argc, char** argv) {
assert(ret == 0); assert(ret == 0);
taosMsleep(10000); taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
taosMsleep(10000);
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode); ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0); assert(ret == 0);
......
#include <stdio.h>
#include <gtest/gtest.h>
#include "syncRaftStore.h" #include "syncRaftStore.h"
//#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
......
#include <stdio.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册