diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2c8e36fd0894cadadde5625d0b9e9abb27630e47..874763cd4565e6b830f7e5f0b31fe888aba4865d 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -155,7 +155,7 @@ typedef struct SSyncNode { SSyncLogStore* pLogStore; SyncIndex commitIndex; - // timer + // ping timer tmr_h pPingTimer; int32_t pingTimerMS; uint64_t pingTimerLogicClock; @@ -163,6 +163,7 @@ typedef struct SSyncNode { TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp uint64_t pingTimerCounter; + // elect timer tmr_h pElectTimer; int32_t electTimerMS; uint64_t electTimerLogicClock; @@ -170,6 +171,7 @@ typedef struct SSyncNode { TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp uint64_t electTimerCounter; + // heartbeat timer tmr_h pHeartbeatTimer; int32_t heartbeatTimerMS; uint64_t heartbeatTimerLogicClock; diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 467cfdde5cfe6384ec910cefc3217cf88c2f5b09..aca6205b9dfd75d044ceb8156fb3683af45f097b 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -53,6 +53,7 @@ extern "C" { // int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); +int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 87017b718d56b62ec6ace7046a33f86da55fa88f..24f41a0e6930fcf97e06e8758d64825e6ab206ec 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -28,11 +28,15 @@ // mdest |-> j]) // /\ UNCHANGED <> // -int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} +int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { + int32_t ret = 0; + return ret; +} int32_t syncNodeElect(SSyncNode* pSyncNode) { // start election - syncNodeRequestVotePeers(pSyncNode); + int32_t ret = syncNodeRequestVotePeers(pSyncNode); + return ret; } int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 44d2e4c16087f87dae1d79c91c706ed803391905..f2939df60cf0ba7239873efd587a82b090b656ec 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -37,7 +37,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -static void UpdateTerm(SyncTerm term); +static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term); static void syncNodeBecomeFollower(SSyncNode* pSyncNode); static void syncNodeBecomeLeader(SSyncNode* pSyncNode); static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); @@ -452,9 +452,38 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } } -static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <= + atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) { + SyncTimeout* pSyncMsg = + syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock), + pSyncNode->heartbeatTimerMS, pSyncNode); + + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncTimeoutDestroy(pSyncMsg); + + // reset timer ms + // pSyncNode->heartbeatTimerMS += 100; -static void UpdateTerm(SyncTerm term) {} + taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); + } else { + sTrace("syncNodeEqHeartbeatTimer: heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu", + pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); + } +} + +static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { + if (term > pSyncNode->pRaftStore->currentTerm) { + pSyncNode->pRaftStore->currentTerm = term; + pSyncNode->pRaftStore->voteFor = EMPTY_RAFT_ID; + raftStorePersist(pSyncNode->pRaftStore); + syncNodeBecomeFollower(pSyncNode); + } +} static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 37e8959ff33a3c2f58db762ab9fb9053154c4e87..dfbe5db0ed7e6484a731f374e5051db3b1ab6f97 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -41,7 +41,16 @@ // mdest |-> j]) // /\ UNCHANGED <> // -int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} +int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { + int32_t ret = 0; + return ret; +} + +int32_t syncNodeReplicate(SSyncNode* pSyncNode) { + // start replicate + int32_t ret = syncNodeAppendEntriesPeers(pSyncNode); + return ret; +} int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode); diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index df9b9d27b4a39f34f6ae7ef2100760f20baec40e..7cbfd6d40a6caa713e87964b59a4a0e257c897d0 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -44,7 +44,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->heartbeatTimerCounter); - syncNodeAppendEntriesPeers(ths); + syncNodeReplicate(ths); } } else { sTrace("unknown timeoutType:%d", pMsg->timeoutType); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 04be3f33aca4f6188953f845367683eb6b314067..fa6c245fd83b47b69c4a126c1e37f54b421609f2 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -127,7 +127,7 @@ cJSON* syncUtilRaftId2Json(const SRaftId* p) { cJSON_AddNumberToObject(pRoot, "vgId", p->vgId); cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot); + cJSON_AddItemToObject(pJson, "SRaftId", pRoot); return pJson; }