From a0319ae49bdf86275adf1e150c71ea5dc1ba8958 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 7 Mar 2022 14:42:04 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncAppendEntries.h | 4 -- source/libs/sync/inc/syncAppendEntriesReply.h | 2 - source/libs/sync/inc/syncElection.h | 1 - source/libs/sync/inc/syncInt.h | 2 + source/libs/sync/inc/syncOnMessage.h | 2 - source/libs/sync/inc/syncTimeout.h | 2 +- source/libs/sync/src/syncElection.c | 7 ++-- source/libs/sync/src/syncMain.c | 39 +++---------------- source/libs/sync/src/syncMessage.c | 2 - source/libs/sync/src/syncTimeout.c | 37 +++++++++++++++++- 10 files changed, 48 insertions(+), 50 deletions(-) diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index b7c1c051cc..29358fcf90 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -28,10 +28,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg); - -void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 22f8eb464f..af6453d839 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -28,8 +28,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index ed5b86fa98..7299a3fe2e 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -27,7 +27,6 @@ extern "C" { #include "taosdef.h" void syncNodeElect(SSyncNode* pSyncNode); -void syncNodeRequestVotePeers(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d4703b3e74..e015bee530 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -191,6 +191,8 @@ void syncNodeClose(SSyncNode* pSyncNode); void syncNodePingAll(SSyncNode* pSyncNode); void syncNodePingPeers(SSyncNode* pSyncNode); void syncNodePingSelf(SSyncNode* pSyncNode); +void syncNodeRequestVotePeers(SSyncNode* pSyncNode); +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); diff --git a/source/libs/sync/inc/syncOnMessage.h b/source/libs/sync/inc/syncOnMessage.h index 07c44b6199..8eae4fed4d 100644 --- a/source/libs/sync/inc/syncOnMessage.h +++ b/source/libs/sync/inc/syncOnMessage.h @@ -26,8 +26,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void onMessage(SRaft *pRaft, void *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index d9d6a17939..3dda1f212c 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -28,7 +28,7 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void onTimeout(SRaft *pRaft, void *pMsg); +int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 433201b849..6d12af02c0 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -15,6 +15,7 @@ #include "syncElection.h" -void syncNodeElect(SSyncNode* pSyncNode) {} - -void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} \ No newline at end of file +void syncNodeElect(SSyncNode* pSyncNode) { + // start election + syncNodeRequestVotePeers(pSyncNode); +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ad3639b32d..c2ceb515b0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -18,6 +18,7 @@ #include "syncEnv.h" #include "syncInt.h" #include "syncRaft.h" +#include "syncTimeout.h" #include "syncUtil.h" static int32_t tsNodeRefId = -1; @@ -33,7 +34,6 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); static void syncNodeBecomeFollower(SSyncNode* pSyncNode); static void syncNodeBecomeLeader(SSyncNode* pSyncNode); @@ -41,9 +41,6 @@ static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); static void syncNodeCandidate2Leader(SSyncNode* pSyncNode); static void syncNodeLeader2Follower(SSyncNode* pSyncNode); static void syncNodeCandidate2Follower(SSyncNode* pSyncNode); - -void syncNodeRequestVotePeers(SSyncNode* pSyncNode); -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); // --------------------------------- int32_t syncInit() { @@ -171,6 +168,10 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { syncPingDestroy(pMsg); } +void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} + +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} + int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); pSyncNode->pingTimerMS = PING_TIMER_MS; @@ -311,32 +312,6 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { return ret; } -static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { - int32_t ret = 0; - sTrace("<-- syncNodeOnTimeoutCb -->"); - - { - cJSON* pJson = syncTimeout2Json(pMsg); - char* serialized = cJSON_Print(pJson); - sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { - if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { - ++(ths->pingTimerCounter); - syncNodePingAll(ths); - } - - } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { - } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { - } else { - } - - return ret; -} - static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { @@ -415,7 +390,3 @@ static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} - -void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} - -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 33e311b0fa..14f139a803 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -18,8 +18,6 @@ #include "syncUtil.h" #include "tcoding.h" -void onMessage(SRaft* pRaft, void* pMsg) {} - // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* pRoot; diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index e27df55d07..589921dddc 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -14,5 +14,40 @@ */ #include "syncTimeout.h" +#include "syncElection.h" -void onTimeout(SRaft *pRaft, void *pMsg) {} \ No newline at end of file +int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { + int32_t ret = 0; + sTrace("<-- syncNodeOnTimeoutCb -->"); + + { + cJSON* pJson = syncTimeout2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { + if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { + ++(ths->pingTimerCounter); + syncNodePingAll(ths); + } + + } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { + if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) { + ++(ths->electTimerCounter); + syncNodeElect(ths); + } + + } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { + if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) { + ++(ths->heartbeatTimerCounter); + syncNodeAppendEntriesPeers(ths); + } + } else { + sTrace("unknown timeoutType:%d", pMsg->timeoutType); + } + + return ret; +} \ No newline at end of file -- GitLab