提交 f1a41f5a 编写于 作者: S Shengliang Guan

refact: sync message headfile

上级 f2191363
......@@ -239,11 +239,22 @@ void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePreClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
// option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
// on message ---------------------
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);
// timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
......
......@@ -236,26 +236,6 @@ typedef struct SyncLocalCmd {
SyncIndex fcIndex; // follower commit index
} SyncLocalCmd;
// on message ----------------------
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
// option ----------------------------------
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
const char* syncTimerTypeStr( ESyncTimeoutType timerType);
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd);
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode);
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seq, bool isWeak, int32_t vgId);
int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
......@@ -273,6 +253,9 @@ int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId);
const char* syncTimerTypeStr(ESyncTimeoutType timerType);
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd);
#ifdef __cplusplus
}
#endif
......
......@@ -34,7 +34,7 @@ extern "C" {
// /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]]
// /\ UNCHANGED <<messages, leaderVars, logVars>>
//
int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg);
#ifdef __cplusplus
}
......
......@@ -138,7 +138,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
code = syncNodeOnTimer(pSyncNode, pMsg);
code = syncNodeOnTimeout(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
......@@ -1034,9 +1034,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode);
}
// option
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
// timer control --------------
......
......@@ -15,10 +15,7 @@
#define _DEFAULT_SOURCE
#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncRaftEntry.h"
#include "syncUtil.h"
#include "tcoding.h"
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
SSyncNode* pNode) {
......@@ -189,6 +186,7 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
return 0;
}
#if 0
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncPreSnapshot);
pMsg->pCont = rpcMallocCont(bytes);
......@@ -222,6 +220,7 @@ int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) {
pPreSnapshotReply->vgId = vgId;
return 0;
}
#endif
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
int32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
......
......@@ -85,7 +85,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
return 0;
}
int32_t syncNodeOnTimer(SSyncNode* ths, const SRpcMsg* pRpc) {
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) {
int32_t ret = 0;
SyncTimeout* pMsg = pRpc->pCont;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册