提交 11c34638 编写于 作者: S Shengliang Guan

refact: adjust sync code struct

上级 4940080a
...@@ -22,49 +22,9 @@ extern "C" { ...@@ -22,49 +22,9 @@ extern "C" {
#include "sync.h" #include "sync.h"
#include "taosdef.h" #include "taosdef.h"
#include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
// clang-format off
#define sFatal(...) if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }
#define sError(...) if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
#define sWarn(...) if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }
#define sInfo(...) if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }
#define sDebug(...) if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }
#define sTrace(...) if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }
#define sLFatal(...) if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }
#define sLError(...) if (sDebugFlag & DEBUG_ERROR) { taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
#define sLWarn(...) if (sDebugFlag & DEBUG_WARN) { taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }
#define sLInfo(...) if (sDebugFlag & DEBUG_INFO) { taosPrintLongString("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }
#define sLDebug(...) if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }
#define sLTrace(...) if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }
#define sNFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintNodeLog("SYN FATAL ", DEBUG_FATAL, 255, pNode, __VA_ARGS__); }
#define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, pNode, __VA_ARGS__); }
#define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, pNode, __VA_ARGS__); }
#define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, pNode, __VA_ARGS__); }
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, pNode, __VA_ARGS__); }
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, pNode, __VA_ARGS__); }
#define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); }
#define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); }
#define sSWarn(pSender, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintSnapshotSenderLog("SYN WARN ", DEBUG_WARN, 255, pSender, __VA_ARGS__); }
#define sSInfo(pSender, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintSnapshotSenderLog("SYN ", DEBUG_INFO, 255, pSender, __VA_ARGS__); }
#define sSDebug(pSender, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintSnapshotSenderLog("SYN ", DEBUG_DEBUG, sDebugFlag, pSender, __VA_ARGS__); }
#define sSTrace(pSender, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintSnapshotSenderLog("SYN ", DEBUG_TRACE, sDebugFlag, pSender, __VA_ARGS__); }
#define sRFatal(pReceiver, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotReceiverLog("SYN FATAL ", DEBUG_FATAL, 255, pReceiver, __VA_ARGS__); }
#define sRError(pReceiver, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotReceiverLog("SYN ERROR ", DEBUG_ERROR, 255, pReceiver, __VA_ARGS__); }
#define sRWarn(pReceiver, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintSnapshotReceiverLog("SYN WARN ", DEBUG_WARN, 255, pReceiver, __VA_ARGS__); }
#define sRInfo(pReceiver, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintSnapshotReceiverLog("SYN ", DEBUG_INFO, 255, pReceiver, __VA_ARGS__); }
#define sRDebug(pReceiver, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintSnapshotReceiverLog("SYN ", DEBUG_DEBUG, sDebugFlag, pReceiver, __VA_ARGS__); }
#define sRTrace(pReceiver, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintSnapshotReceiverLog("SYN ", DEBUG_TRACE, sDebugFlag, pReceiver, __VA_ARGS__); }
// clang-format on
typedef struct SyncTimeout SyncTimeout; typedef struct SyncTimeout SyncTimeout;
typedef struct SyncClientRequest SyncClientRequest; typedef struct SyncClientRequest SyncClientRequest;
typedef struct SyncRequestVote SyncRequestVote; typedef struct SyncRequestVote SyncRequestVote;
...@@ -91,8 +51,6 @@ typedef struct SyncHeartbeatReply SyncHeartbeatReply; ...@@ -91,8 +51,6 @@ typedef struct SyncHeartbeatReply SyncHeartbeatReply;
typedef struct SyncHeartbeat SyncHeartbeat; typedef struct SyncHeartbeat SyncHeartbeat;
typedef struct SyncPreSnapshot SyncPreSnapshot; typedef struct SyncPreSnapshot SyncPreSnapshot;
extern bool gRaftDetailLog;
typedef struct SRaftId { typedef struct SRaftId {
SyncNodeId addr; SyncNodeId addr;
SyncGroupId vgId; SyncGroupId vgId;
...@@ -239,9 +197,6 @@ void syncNodeClose(SSyncNode* pSyncNode); ...@@ -239,9 +197,6 @@ void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePreClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
// on message --------------------- // on message ---------------------
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
...@@ -269,9 +224,6 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); ...@@ -269,9 +224,6 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
// utils -------------- // utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
// raft state change -------------- // raft state change --------------
...@@ -302,13 +254,10 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index); ...@@ -302,13 +254,10 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index); SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index);
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm); int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex); int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex);
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code); int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code);
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId); SSyncTimer* syncNodeGetHbTimer(SSyncNode* ths, SRaftId* pDestId);
...@@ -318,13 +267,7 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const ...@@ -318,13 +267,7 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
bool syncNodeCanChange(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode); int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode);
bool syncNodeIsMnode(SSyncNode* pSyncNode); bool syncNodeIsMnode(SSyncNode* pSyncNode);
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
......
...@@ -21,6 +21,46 @@ extern "C" { ...@@ -21,6 +21,46 @@ extern "C" {
#endif #endif
#include "syncInt.h" #include "syncInt.h"
#include "tlog.h"
// clang-format off
#define sFatal(...) if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }
#define sError(...) if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
#define sWarn(...) if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }
#define sInfo(...) if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }
#define sDebug(...) if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }
#define sTrace(...) if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }
#define sLFatal(...) if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }
#define sLError(...) if (sDebugFlag & DEBUG_ERROR) { taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
#define sLWarn(...) if (sDebugFlag & DEBUG_WARN) { taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }
#define sLInfo(...) if (sDebugFlag & DEBUG_INFO) { taosPrintLongString("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }
#define sLDebug(...) if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }
#define sLTrace(...) if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }
#define sNFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintNodeLog("SYN FATAL ", DEBUG_FATAL, 255, pNode, __VA_ARGS__); }
#define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, pNode, __VA_ARGS__); }
#define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, pNode, __VA_ARGS__); }
#define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, pNode, __VA_ARGS__); }
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, pNode, __VA_ARGS__); }
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, pNode, __VA_ARGS__); }
#define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); }
#define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); }
#define sSWarn(pSender, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintSnapshotSenderLog("SYN WARN ", DEBUG_WARN, 255, pSender, __VA_ARGS__); }
#define sSInfo(pSender, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintSnapshotSenderLog("SYN ", DEBUG_INFO, 255, pSender, __VA_ARGS__); }
#define sSDebug(pSender, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintSnapshotSenderLog("SYN ", DEBUG_DEBUG, sDebugFlag, pSender, __VA_ARGS__); }
#define sSTrace(pSender, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintSnapshotSenderLog("SYN ", DEBUG_TRACE, sDebugFlag, pSender, __VA_ARGS__); }
#define sRFatal(pReceiver, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotReceiverLog("SYN FATAL ", DEBUG_FATAL, 255, pReceiver, __VA_ARGS__); }
#define sRError(pReceiver, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotReceiverLog("SYN ERROR ", DEBUG_ERROR, 255, pReceiver, __VA_ARGS__); }
#define sRWarn(pReceiver, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintSnapshotReceiverLog("SYN WARN ", DEBUG_WARN, 255, pReceiver, __VA_ARGS__); }
#define sRInfo(pReceiver, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintSnapshotReceiverLog("SYN ", DEBUG_INFO, 255, pReceiver, __VA_ARGS__); }
#define sRDebug(pReceiver, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintSnapshotReceiverLog("SYN ", DEBUG_DEBUG, sDebugFlag, pReceiver, __VA_ARGS__); }
#define sRTrace(pReceiver, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintSnapshotReceiverLog("SYN ", DEBUG_TRACE, sDebugFlag, pReceiver, __VA_ARGS__); }
// clang-format on
uint64_t syncUtilAddr2U64(const char* host, uint16_t port); uint64_t syncUtilAddr2U64(const char* host, uint16_t port);
void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port); void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port);
......
...@@ -15,11 +15,11 @@ ...@@ -15,11 +15,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "syncEnv.h" #include "syncEnv.h"
#include "syncUtil.h"
#include "tref.h" #include "tref.h"
static SSyncEnv gSyncEnv = {0}; static SSyncEnv gSyncEnv = {0};
static int32_t gNodeRefId = -1; static int32_t gNodeRefId = -1;
bool gRaftDetailLog = false;
static void syncEnvTick(void *param, void *tmrId); static void syncEnvTick(void *param, void *tmrId);
SSyncEnv *syncEnv() { return &gSyncEnv; } SSyncEnv *syncEnv() { return &gSyncEnv; }
......
...@@ -45,6 +45,18 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNew ...@@ -45,6 +45,18 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNew
static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId); static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
static bool syncNodeCanChange(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
static SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
int64_t syncOpen(SSyncInfo* pSyncInfo) { int64_t syncOpen(SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
...@@ -133,30 +145,43 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { ...@@ -133,30 +145,43 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return code; if (pSyncNode == NULL) return code;
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) { switch (pMsg->msgType) {
case TDMT_SYNC_HEARTBEAT:
code = syncNodeOnHeartbeat(pSyncNode, pMsg); code = syncNodeOnHeartbeat(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) { break;
case TDMT_SYNC_HEARTBEAT_REPLY:
code = syncNodeOnHeartbeatReply(pSyncNode, pMsg); code = syncNodeOnHeartbeatReply(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { break;
case TDMT_SYNC_TIMEOUT:
code = syncNodeOnTimeout(pSyncNode, pMsg); code = syncNodeOnTimeout(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { break;
case TDMT_SYNC_CLIENT_REQUEST:
code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL); code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { break;
case TDMT_SYNC_REQUEST_VOTE:
code = syncNodeOnRequestVote(pSyncNode, pMsg); code = syncNodeOnRequestVote(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { break;
case TDMT_SYNC_REQUEST_VOTE_REPLY:
code = syncNodeOnRequestVoteReply(pSyncNode, pMsg); code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { break;
case TDMT_SYNC_APPEND_ENTRIES:
code = syncNodeOnAppendEntries(pSyncNode, pMsg); code = syncNodeOnAppendEntries(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { break;
case TDMT_SYNC_APPEND_ENTRIES_REPLY:
code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg); code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { break;
case TDMT_SYNC_SNAPSHOT_SEND:
code = syncNodeOnSnapshot(pSyncNode, pMsg); code = syncNodeOnSnapshot(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { break;
case TDMT_SYNC_SNAPSHOT_RSP:
code = syncNodeOnSnapshotReply(pSyncNode, pMsg); code = syncNodeOnSnapshotReply(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { break;
case TDMT_SYNC_LOCAL_CMD:
code = syncNodeOnLocalCmd(pSyncNode, pMsg); code = syncNodeOnLocalCmd(pSyncNode, pMsg);
} else { break;
sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); default:
sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg,
TMSG_INFO(pMsg->msgType));
code = -1; code = -1;
} }
...@@ -1619,8 +1644,6 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { ...@@ -1619,8 +1644,6 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
sNTrace(pSyncNode, "candidate to follower"); sNTrace(pSyncNode, "candidate to follower");
} }
// raft vote --------------
// just called by syncNodeVoteForSelf // just called by syncNodeVoteForSelf
// need assert // need assert
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncUtil.h"
// file must already exist! // file must already exist!
SRaftCfgIndex *raftCfgIndexOpen(const char *path) { SRaftCfgIndex *raftCfgIndexOpen(const char *path) {
......
...@@ -36,6 +36,8 @@ extern "C" { ...@@ -36,6 +36,8 @@ extern "C" {
typedef struct SyncPing SyncPing; typedef struct SyncPing SyncPing;
typedef struct SyncPingReply SyncPingReply; typedef struct SyncPingReply SyncPingReply;
extern bool gRaftDetailLog;
typedef struct SSyncIO { typedef struct SSyncIO {
STaosQueue *pMsgQ; STaosQueue *pMsgQ;
STaosQset *pQset; STaosQset *pQset;
......
...@@ -110,6 +110,7 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore); ...@@ -110,6 +110,7 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore);
cJSON* syncNode2Json(const SSyncNode* pSyncNode); cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
cJSON* voteGranted2Json(SVotesGranted* pVotesGranted); cJSON* voteGranted2Json(SVotesGranted* pVotesGranted);
char* voteGranted2Str(SVotesGranted* pVotesGranted); char* voteGranted2Str(SVotesGranted* pVotesGranted);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
bool gRaftDetailLog = false;
SSyncIO *gSyncIO = NULL; SSyncIO *gSyncIO = NULL;
// local function ------------ // local function ------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册