diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 29358fcf90c174a2d706adbe2b188bb7fb5e4513..0156e695a3ead53bfb2e2e8f5b05d75891c78ca2 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -28,6 +28,8 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index af6453d839aae95fa831d2caea1428bd558f81bb..7b80172e8dbd6b27664f0b43a4b3b5bb539a5ec5 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -28,6 +28,8 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 7299a3fe2eeb725dbf618891fbb726234d4404ea..abacfb8093067a2c2a1764e2c4663efbcdc151b1 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -26,7 +26,11 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" -void syncNodeElect(SSyncNode* pSyncNode); +int32_t syncNodeElect(SSyncNode* pSyncNode); + +int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); + +int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e015bee5301ab9177cfb1c961f80562295c9fdcd..bce03059a04e8a29015aa4f18a8c71a2d2a55eb4 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -188,29 +188,22 @@ typedef struct SSyncNode { SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeClose(SSyncNode* pSyncNode); -void syncNodePingAll(SSyncNode* pSyncNode); -void syncNodePingPeers(SSyncNode* pSyncNode); -void syncNodePingSelf(SSyncNode* pSyncNode); -void syncNodeRequestVotePeers(SSyncNode* pSyncNode); -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); + +int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); +int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); +int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); +void syncNodePingAll(SSyncNode* pSyncNode); +void syncNodePingPeers(SSyncNode* pSyncNode); +void syncNodePingSelf(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); - int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); - int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); -int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); -int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); -int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index a9875d5caeb6fcfac97de9b18e06d5df1c5fa2fd..72ce986a7ea258684fbff8a9b5283c99a92cac9d 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -26,7 +26,9 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); +int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); + +int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index 4fb21930105a82f2a8718ad817816cb9e0c83262..da821c3ebd07833834d5ded9372c67e3ac00b30d 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -28,6 +28,8 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 21fb61f85fafb2d7cfcf7f083c5e00d25d9bb5b0..82f132f80bc8083e2e7c339ef2b0145b6261a66f 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -28,6 +28,8 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index f3045c31801c6d03f5c3f6c4fac6cbae06d9d8d0..243a566ff04496b88a5f1a7ed52e5e30c525a8d9 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,33 +15,6 @@ #include "syncAppendEntries.h" -int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { - // TLA+ Spec - // AppendEntries(i, j) == - // /\ i /= j - // /\ state[i] = Leader - // /\ LET prevLogIndex == nextIndex[i][j] - 1 - // prevLogTerm == IF prevLogIndex > 0 THEN - // log[i][prevLogIndex].term - // ELSE - // 0 - // \* Send up to 1 entry, constrained by the end of the log. - // lastEntry == Min({Len(log[i]), nextIndex[i][j]}) - // entries == SubSeq(log[i], nextIndex[i][j], lastEntry) - // IN Send([mtype |-> AppendEntriesRequest, - // mterm |-> currentTerm[i], - // mprevLogIndex |-> prevLogIndex, - // mprevLogTerm |-> prevLogTerm, - // mentries |-> entries, - // \* mlog is used as a history variable for the proof. - // \* It would not exist in a real implementation. - // mlog |-> log[i], - // mcommitIndex |-> Min({commitIndex[i], lastEntry}), - // msource |-> i, - // mdest |-> j]) - // /\ UNCHANGED <> -} - int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 6d12af02c035fcedca3dd863c6cc41d1d7d45195..fe86d220cca8579cd9ca235a149b943b0933fcc4 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -14,8 +14,32 @@ */ #include "syncElection.h" +#include "syncMessage.h" -void syncNodeElect(SSyncNode* pSyncNode) { +int32_t syncNodeElect(SSyncNode* pSyncNode) { // start election syncNodeRequestVotePeers(pSyncNode); } + +// TLA+ Spec +// RequestVote(i, j) == +// /\ state[i] = Candidate +// /\ j \notin votesResponded[i] +// /\ Send([mtype |-> RequestVoteRequest, +// mterm |-> currentTerm[i], +// mlastLogTerm |-> LastTerm(log[i]), +// mlastLogIndex |-> Len(log[i]), +// msource |-> i, +// mdest |-> j]) +// /\ UNCHANGED <> +int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} + +int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { + sTrace("syncNodeRequestVote pSyncNode:%p ", pSyncNode); + int32_t ret = 0; + + SRpcMsg rpcMsg; + syncRequestVote2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + return ret; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c2ceb515b00382c897186650dca71be485842d89..cac2b6953de927fae07e5cc5450d508461819ed3 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -15,23 +15,23 @@ #include #include "sync.h" +#include "syncAppendEntries.h" +#include "syncAppendEntriesReply.h" #include "syncEnv.h" #include "syncInt.h" #include "syncRaft.h" +#include "syncRequestVote.h" +#include "syncRequestVoteReply.h" #include "syncTimeout.h" #include "syncUtil.h" static int32_t tsNodeRefId = -1; // ------ local funciton --------- -static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); -static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); - static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); -static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); @@ -135,6 +135,48 @@ void syncNodeClose(SSyncNode* pSyncNode) { free(pSyncNode); } +int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + syncUtilraftId2EpSet(destRaftId, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + syncUtilnodeInfo2EpSet(nodeInfo, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { + sTrace("syncNodePing pSyncNode:%p ", pSyncNode); + int32_t ret = 0; + + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodePing pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + { + SyncPing* pMsg2 = rpcMsg.pCont; + cJSON* pJson = syncPing2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodePing rpcMsg.pCont:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + return ret; +} + void syncNodePingAll(SSyncNode* pSyncNode) { sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode); int32_t ret = 0; @@ -168,10 +210,6 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { syncPingDestroy(pMsg); } -void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} - -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} - int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); pSyncNode->pingTimerMS = PING_TIMER_MS; @@ -235,48 +273,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { } // ------ local funciton --------- -static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { - sTrace("syncNodePing pSyncNode:%p ", pSyncNode); - int32_t ret = 0; - - SRpcMsg rpcMsg; - syncPing2RpcMsg(pMsg, &rpcMsg); - syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); - - { - cJSON* pJson = syncPing2Json(pMsg); - char* serialized = cJSON_Print(pJson); - sTrace("syncNodePing pMsg:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - { - SyncPing* pMsg2 = rpcMsg.pCont; - cJSON* pJson = syncPing2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - sTrace("syncNodePing rpcMsg.pCont:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - return ret; -} - -static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { - SEpSet epSet; - syncUtilraftId2EpSet(destRaftId, &epSet); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); - return 0; -} - -static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { - SEpSet epSet; - syncUtilnodeInfo2EpSet(nodeInfo, &epSet); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); - return 0; -} - static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { int32_t ret = 0; sTrace("<-- syncNodeOnPingCb -->"); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index e3e551fd2b2d3c5f2338e7e84345f8a618fec738..878a87067758909f8a5955578bd9be13b9c69a9c 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -14,5 +14,40 @@ */ #include "syncReplication.h" +#include "syncMessage.h" -void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} \ No newline at end of file +// TLA+ Spec +// AppendEntries(i, j) == +// /\ i /= j +// /\ state[i] = Leader +// /\ LET prevLogIndex == nextIndex[i][j] - 1 +// prevLogTerm == IF prevLogIndex > 0 THEN +// log[i][prevLogIndex].term +// ELSE +// 0 +// \* Send up to 1 entry, constrained by the end of the log. +// lastEntry == Min({Len(log[i]), nextIndex[i][j]}) +// entries == SubSeq(log[i], nextIndex[i][j], lastEntry) +// IN Send([mtype |-> AppendEntriesRequest, +// mterm |-> currentTerm[i], +// mprevLogIndex |-> prevLogIndex, +// mprevLogTerm |-> prevLogTerm, +// mentries |-> entries, +// \* mlog is used as a history variable for the proof. +// \* It would not exist in a real implementation. +// mlog |-> log[i], +// mcommitIndex |-> Min({commitIndex[i], lastEntry}), +// msource |-> i, +// mdest |-> j]) +// /\ UNCHANGED <> +int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} + +int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { + sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode); + int32_t ret = 0; + + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + return ret; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 38eaea26ac0c28b2ff3e6646ed9e5abbbab5537b..0edd6d2ce47cd7d638e69ac2cc9e1a5f72a1992b 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -15,20 +15,6 @@ #include "syncRequestVote.h" -int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { - // TLA+ Spec - // RequestVote(i, j) == - // /\ state[i] = Candidate - // /\ j \notin votesResponded[i] - // /\ Send([mtype |-> RequestVoteRequest, - // mterm |-> currentTerm[i], - // mlastLogTerm |-> LastTerm(log[i]), - // mlastLogIndex |-> Len(log[i]), - // msource |-> i, - // mdest |-> j]) - // /\ UNCHANGED <> -} - int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { // TLA+ Spec // HandleRequestVoteRequest(i, j, m) == diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 589921dddc5e392cfcf4ae68f5b17875f38b2e02..df9b9d27b4a39f34f6ae7ef2100760f20baec40e 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -15,6 +15,7 @@ #include "syncTimeout.h" #include "syncElection.h" +#include "syncReplication.h" int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0;