提交 10296908 编写于 作者: M Minghao Li

refactor(sync): add local-cmd:follower-commit

上级 1a4b7622
...@@ -729,6 +729,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); ...@@ -729,6 +729,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
typedef enum { typedef enum {
SYNC_LOCAL_CMD_STEP_DOWN = 100, SYNC_LOCAL_CMD_STEP_DOWN = 100,
SYNC_LOCAL_CMD_FOLLOWER_CMT,
} ESyncLocalCmd; } ESyncLocalCmd;
const char* syncLocalCmdGetStr(int32_t cmd); const char* syncLocalCmdGetStr(int32_t cmd);
...@@ -742,6 +743,7 @@ typedef struct SyncLocalCmd { ...@@ -742,6 +743,7 @@ typedef struct SyncLocalCmd {
int32_t cmd; int32_t cmd;
SyncTerm sdNewTerm; // step down new term SyncTerm sdNewTerm; // step down new term
SyncIndex fcIndex;// follower commit index
} SyncLocalCmd; } SyncLocalCmd;
......
...@@ -90,6 +90,11 @@ ...@@ -90,6 +90,11 @@
// //
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeEventLog(ths, "can not do follower commit");
return -1;
}
// maybe update commit index, leader notice me // maybe update commit index, leader notice me
if (newCommitIndex > ths->commitIndex) { if (newCommitIndex > ths->commitIndex) {
// has commit entry in local // has commit entry in local
......
...@@ -2842,11 +2842,25 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { ...@@ -2842,11 +2842,25 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
syncNodeResetElectTimer(ths); syncNodeResetElectTimer(ths);
ths->minMatchIndex = pMsg->minMatchIndex; ths->minMatchIndex = pMsg->minMatchIndex;
#if 0
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollowerCommit(ths, pMsg->commitIndex); // syncNodeFollowerCommit(ths, pMsg->commitIndex);
SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
pSyncMsg->fcIndex = pMsg->commitIndex;
SRpcMsg rpcMsgLocalCmd;
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) {
sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont);
} else {
sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index: %" PRIu64, ths->vgId, pSyncMsg->fcIndex);
}
}
} }
#endif
} }
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
...@@ -2900,6 +2914,9 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) { ...@@ -2900,6 +2914,9 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm); syncNodeStepDown(ths, pMsg->sdNewTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
syncNodeFollowerCommit(ths, pMsg->fcIndex);
} else { } else {
syncNodeErrorLog(ths, "error local cmd"); syncNodeErrorLog(ths, "error local cmd");
} }
......
...@@ -3400,6 +3400,8 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { ...@@ -3400,6 +3400,8 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) {
const char* syncLocalCmdGetStr(int32_t cmd) { const char* syncLocalCmdGetStr(int32_t cmd) {
if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) { if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
return "step-down"; return "step-down";
} else if (cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
return "follower-commit";
} }
return "unknown-local-cmd"; return "unknown-local-cmd";
...@@ -3511,6 +3513,9 @@ cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { ...@@ -3511,6 +3513,9 @@ cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf); cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex);
cJSON_AddStringToObject(pRoot, "fc-index", u64buf);
} }
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();
......
...@@ -21,6 +21,7 @@ SyncLocalCmd *createMsg() { ...@@ -21,6 +21,7 @@ SyncLocalCmd *createMsg() {
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100; pMsg->destId.vgId = 100;
pMsg->sdNewTerm = 123; pMsg->sdNewTerm = 123;
pMsg->fcIndex = 456;
pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
return pMsg; return pMsg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册