提交 0b20b914 编写于 作者: M Minghao Li

refactor(sync): add sync local cmd

上级 bfb92713
...@@ -232,6 +232,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader); ...@@ -232,6 +232,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
int32_t syncEndSnapshot(int64_t rid); int32_t syncEndSnapshot(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -586,6 +586,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -586,6 +586,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SRpcMsg rsp = {.code = code, .info = pMsg->info}; SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
syncLocalCmdDestroy(pSyncMsg);
} else { } else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = -1; code = -1;
......
...@@ -350,6 +350,11 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -350,6 +350,11 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg); syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
syncLocalCmdDestroy(pSyncMsg);
} else { } else {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
code = -1; code = -1;
......
...@@ -528,6 +528,20 @@ int32_t syncEndSnapshot(int64_t rid) { ...@@ -528,6 +528,20 @@ int32_t syncEndSnapshot(int64_t rid) {
return code; return code;
} }
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
ASSERT(rid == pSyncNode->rid);
syncNodeStepDown(pSyncNode, newTerm);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
if (pSyncNode->peersNum == 0) { if (pSyncNode->peersNum == 0) {
sDebug("only one replica, cannot leader transfer"); sDebug("only one replica, cannot leader transfer");
...@@ -3041,12 +3055,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { ...@@ -3041,12 +3055,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg); syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);
#if 1
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeStepDown(ths, pMsg->term);
}
#endif
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncNodeResetElectTimer(ths); syncNodeResetElectTimer(ths);
ths->minMatchIndex = pMsg->minMatchIndex; ths->minMatchIndex = pMsg->minMatchIndex;
...@@ -3058,6 +3066,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { ...@@ -3058,6 +3066,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
#endif #endif
} }
#if 1
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeStepDown(ths, pMsg->term);
}
#endif
/* /*
// htonl // htonl
SMsgHead* pHead = rpcMsg.pCont; SMsgHead* pHead = rpcMsg.pCont;
...@@ -3081,6 +3095,17 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { ...@@ -3081,6 +3095,17 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
return 0; return 0;
} }
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm);
} else {
syncNodeErrorLog(ths, "error local cmd");
}
return 0;
}
// TLA+ Spec // TLA+ Spec
// ClientRequest(i, v) == // ClientRequest(i, v) ==
// /\ state[i] = Leader // /\ state[i] = Leader
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册