提交 77ba9007 编写于 作者: M Minghao Li

refactor(sync): add sync local cmd, step down

上级 0b20b914
...@@ -679,10 +679,13 @@ void syncReconfigFinishLog(const SyncReconfigFinish* pMsg); ...@@ -679,10 +679,13 @@ void syncReconfigFinishLog(const SyncReconfigFinish* pMsg);
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
// --------------------------------------------- // ---------------------------------------------
typedef enum { typedef enum {
SYNC_LOCAL_CMD_STEP_DOWN = 100, SYNC_LOCAL_CMD_STEP_DOWN = 100,
} ESyncLocalCmd; } ESyncLocalCmd;
const char* syncLocalCmdGetStr(int32_t cmd);
typedef struct SyncLocalCmd { typedef struct SyncLocalCmd {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
......
...@@ -341,6 +341,8 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const ...@@ -341,6 +341,8 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
// for debug -------------- // for debug --------------
void syncNodePrint(SSyncNode* pObj); void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj);
......
...@@ -3066,11 +3066,27 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { ...@@ -3066,11 +3066,27 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
#endif #endif
} }
#if 1
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeStepDown(ths, pMsg->term); // syncNodeStepDown(ths, pMsg->term);
SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
pSyncMsg->sdNewTerm = pMsg->term;
SRpcMsg rpcMsgLocalCmd;
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
if (ths->FpEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) {
sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont);
} else {
sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRIu64, ths->vgId, pSyncMsg->sdNewTerm);
}
}
syncLocalCmdDestroy(pSyncMsg);
} }
#endif
/* /*
// htonl // htonl
...@@ -3096,6 +3112,8 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { ...@@ -3096,6 +3112,8 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
} }
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) { int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
syncLogRecvLocalCmd(ths, pMsg, "");
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm); syncNodeStepDown(ths, pMsg->sdNewTerm);
...@@ -3768,3 +3786,10 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p ...@@ -3768,3 +3786,10 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
host, port, pMsg->term, pMsg->privateTerm, s); host, port, pMsg->term, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd,
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
\ No newline at end of file
...@@ -3097,6 +3097,14 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { ...@@ -3097,6 +3097,14 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) {
} }
// --------------------------------------------- // ---------------------------------------------
const char* syncLocalCmdGetStr(int32_t cmd) {
if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
return "step-down";
}
return "unknown-local-cmd";
}
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) { SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncLocalCmd); uint32_t bytes = sizeof(SyncLocalCmd);
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册