diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index eedc403493c7274e086f5b3aa21b609be284d9e4..a1cff2b73832bc1aa6a00f920e4e30a0f7835060 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -679,10 +679,13 @@ void syncReconfigFinishLog(const SyncReconfigFinish* pMsg); void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); // --------------------------------------------- + typedef enum { SYNC_LOCAL_CMD_STEP_DOWN = 100, } ESyncLocalCmd; +const char* syncLocalCmdGetStr(int32_t cmd); + typedef struct SyncLocalCmd { uint32_t bytes; int32_t vgId; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index ae053328ab4f93656a8977ea180470f748c55218..f4949e1016b18336faa302b065e04aab20689185 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -341,6 +341,8 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); +void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 535db4181c438fef71e19f47720929a9d1e522f6..7142e8fb22f69b3d873a595059be945447c634c7 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3066,11 +3066,27 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { #endif } -#if 1 if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { - syncNodeStepDown(ths, pMsg->term); + // syncNodeStepDown(ths, pMsg->term); + SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId); + pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; + pSyncMsg->sdNewTerm = pMsg->term; + + SRpcMsg rpcMsgLocalCmd; + syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); + + if (ths->FpEqMsg != NULL && ths->msgcb != NULL) { + int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd); + if (code != 0) { + sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); + rpcFreeCont(rpcMsgLocalCmd.pCont); + } else { + sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRIu64, ths->vgId, pSyncMsg->sdNewTerm); + } + } + + syncLocalCmdDestroy(pSyncMsg); } -#endif /* // htonl @@ -3096,6 +3112,8 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { } int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) { + syncLogRecvLocalCmd(ths, pMsg, ""); + if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { syncNodeStepDown(ths, pMsg->sdNewTerm); @@ -3768,3 +3786,10 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p host, port, pMsg->term, pMsg->privateTerm, s); syncNodeEventLog(pSyncNode, logBuf); } + +void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd, + syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 3c36633fe848845164d8c3018e4cd627631633c0..f9609d9c396d76c3d6043aea68795c022d07dca3 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -3097,6 +3097,14 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { } // --------------------------------------------- +const char* syncLocalCmdGetStr(int32_t cmd) { + if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) { + return "step-down"; + } + + return "unknown-local-cmd"; +} + SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncLocalCmd); SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);