提交 a0319ae4 编写于 作者: M Minghao Li

sync refactor

上级 b5902836
......@@ -28,10 +28,6 @@ extern "C" {
#include "syncRaft.h"
#include "taosdef.h"
void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg);
void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -28,8 +28,6 @@ extern "C" {
#include "syncRaft.h"
#include "taosdef.h"
void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -27,7 +27,6 @@ extern "C" {
#include "taosdef.h"
void syncNodeElect(SSyncNode* pSyncNode);
void syncNodeRequestVotePeers(SSyncNode* pSyncNode);
#ifdef __cplusplus
}
......
......@@ -191,6 +191,8 @@ void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePingAll(SSyncNode* pSyncNode);
void syncNodePingPeers(SSyncNode* pSyncNode);
void syncNodePingSelf(SSyncNode* pSyncNode);
void syncNodeRequestVotePeers(SSyncNode* pSyncNode);
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
......
......@@ -26,8 +26,6 @@ extern "C" {
#include "syncRaft.h"
#include "taosdef.h"
void onMessage(SRaft *pRaft, void *pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -28,7 +28,7 @@ extern "C" {
#include "syncRaft.h"
#include "taosdef.h"
void onTimeout(SRaft *pRaft, void *pMsg);
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
#ifdef __cplusplus
}
......
......@@ -15,6 +15,7 @@
#include "syncElection.h"
void syncNodeElect(SSyncNode* pSyncNode) {}
void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}
\ No newline at end of file
void syncNodeElect(SSyncNode* pSyncNode) {
// start election
syncNodeRequestVotePeers(pSyncNode);
}
......@@ -18,6 +18,7 @@
#include "syncEnv.h"
#include "syncInt.h"
#include "syncRaft.h"
#include "syncTimeout.h"
#include "syncUtil.h"
static int32_t tsNodeRefId = -1;
......@@ -33,7 +34,6 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
......@@ -41,9 +41,6 @@ static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
static void syncNodeLeader2Follower(SSyncNode* pSyncNode);
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
void syncNodeRequestVotePeers(SSyncNode* pSyncNode);
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
// ---------------------------------
int32_t syncInit() {
......@@ -171,6 +168,10 @@ void syncNodePingSelf(SSyncNode* pSyncNode) {
syncPingDestroy(pMsg);
}
void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {}
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
pSyncNode->pingTimerMS = PING_TIMER_MS;
......@@ -311,32 +312,6 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
return ret;
}
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0;
sTrace("<-- syncNodeOnTimeoutCb -->");
{
cJSON* pJson = syncTimeout2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->pingTimerCounter);
syncNodePingAll(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
} else {
}
return ret;
}
static void syncNodeEqPingTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) {
......@@ -415,7 +390,3 @@ static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}
static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}
void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {}
\ No newline at end of file
......@@ -18,8 +18,6 @@
#include "syncUtil.h"
#include "tcoding.h"
void onMessage(SRaft* pRaft, void* pMsg) {}
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* pRoot;
......
......@@ -14,5 +14,40 @@
*/
#include "syncTimeout.h"
#include "syncElection.h"
void onTimeout(SRaft *pRaft, void *pMsg) {}
\ No newline at end of file
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0;
sTrace("<-- syncNodeOnTimeoutCb -->");
{
cJSON* pJson = syncTimeout2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->pingTimerCounter);
syncNodePingAll(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->electTimerCounter);
syncNodeElect(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->heartbeatTimerCounter);
syncNodeAppendEntriesPeers(ths);
}
} else {
sTrace("unknown timeoutType:%d", pMsg->timeoutType);
}
return ret;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册