diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 694a6ef62bf907b9cad3762ad11b5199ab4fb87d..b6ff93ec8587119d3fb3c34530ed7576dadb5245 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -211,6 +211,7 @@ void syncCleanUp(); int64_t syncOpen(SSyncInfo* pSyncInfo); void syncStart(int64_t rid); void syncStop(int64_t rid); +void syncPreStop(int64_t rid); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index f604a9289a610aaf7cb0fd0ef8a800e32e5bd106..1b2d85bd29df73dd99814f91fc6b4d7a53f3f70f 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -429,6 +429,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { void mndPreClose(SMnode *pMnode) { if (pMnode != NULL) { syncLeaderTransfer(pMnode->syncMgmt.sync); + syncPreStop(pMnode->syncMgmt.sync); } } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 0696ec090178b56f85bcc757532cc81e00c41457..f7164c4ac3038942d0054db605486e74ff98f241 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -245,6 +245,7 @@ _err: void vnodePreClose(SVnode *pVnode) { if (pVnode) { syncLeaderTransfer(pVnode->sync); + syncPreStop(pVnode->sync); } } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 6ec29d69f5492552b1929e5cbd51ae9d794b62ce..a5ff653b69600741addf04eb5ad3417db6942db5 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -218,6 +218,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo); void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); +void syncNodePreClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); // option diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncTools.h index b48519a5b038ae0e0c2909ec9f83c9e595067c37..2d87fcf7fa9cb7089390930ed2fa2244f043e57d 100644 --- a/source/libs/sync/inc/syncTools.h +++ b/source/libs/sync/inc/syncTools.h @@ -729,6 +729,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); typedef enum { SYNC_LOCAL_CMD_STEP_DOWN = 100, + SYNC_LOCAL_CMD_FOLLOWER_CMT, } ESyncLocalCmd; const char* syncLocalCmdGetStr(int32_t cmd); @@ -742,6 +743,7 @@ typedef struct SyncLocalCmd { int32_t cmd; SyncTerm sdNewTerm; // step down new term + SyncIndex fcIndex;// follower commit index } SyncLocalCmd; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4638475e716effc785f80409517b0b4391c54ba1..f0e296d8725931b985b3fa893605e40a24fc39a9 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -90,6 +90,11 @@ // int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { + if (ths->state != TAOS_SYNC_STATE_FOLLOWER) { + syncNodeEventLog(ths, "can not do follower commit"); + return -1; + } + // maybe update commit index, leader notice me if (newCommitIndex > ths->commitIndex) { // has commit entry in local diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index eb78ea2894f074d4126b53f0c4b7dcc2eae2059e..5cd1ba30259702e439a0ec74a1359fd4b0e7fea5 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -81,6 +81,15 @@ void syncStop(int64_t rid) { } } +void syncPreStop(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) return; + + syncNodePreClose(pSyncNode); + + syncNodeRelease(pSyncNode); +} + static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) { if (!syncNodeInConfig(pSyncNode, pCfg)) return false; return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1; @@ -435,8 +444,12 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { return -1; } - SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; - int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); + int32_t ret = 0; + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; + ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); + } + return ret; } @@ -1222,6 +1235,14 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ASSERT(ret == 0); } +void syncNodePreClose(SSyncNode* pSyncNode) { + // stop elect timer + syncNodeStopElectTimer(pSyncNode); + + // stop heartbeat timer + syncNodeStopHeartbeatTimer(pSyncNode); +} + void syncNodeClose(SSyncNode* pSyncNode) { if (pSyncNode == NULL) { return; @@ -2825,11 +2846,25 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { syncNodeResetElectTimer(ths); ths->minMatchIndex = pMsg->minMatchIndex; -#if 0 if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { - syncNodeFollowerCommit(ths, pMsg->commitIndex); + // syncNodeFollowerCommit(ths, pMsg->commitIndex); + SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId); + pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT; + pSyncMsg->fcIndex = pMsg->commitIndex; + + SRpcMsg rpcMsgLocalCmd; + syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); + + if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { + int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); + if (code != 0) { + sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code); + rpcFreeCont(rpcMsgLocalCmd.pCont); + } else { + sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index: %" PRIu64, ths->vgId, pSyncMsg->fcIndex); + } + } } -#endif } if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { @@ -2883,6 +2918,9 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) { if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { syncNodeStepDown(ths, pMsg->sdNewTerm); + } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { + syncNodeFollowerCommit(ths, pMsg->fcIndex); + } else { syncNodeErrorLog(ths, "error local cmd"); } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 91e8ec91b701890f51eae8c5a5e1502b8a5da25a..d0df931a88ac11f6aa1b4c50d0ce21cbecdf9bf4 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -3400,6 +3400,8 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { const char* syncLocalCmdGetStr(int32_t cmd) { if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) { return "step-down"; + } else if (cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { + return "follower-commit"; } return "unknown-local-cmd"; @@ -3511,6 +3513,9 @@ cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm); cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex); + cJSON_AddStringToObject(pRoot, "fc-index", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/test/syncLocalCmdTest.cpp b/source/libs/sync/test/syncLocalCmdTest.cpp index de908bf9c1eb41bcfa5629e903b054314eaf9f40..b42626df29cfdf01b70a512c573daaf922d40f4a 100644 --- a/source/libs/sync/test/syncLocalCmdTest.cpp +++ b/source/libs/sync/test/syncLocalCmdTest.cpp @@ -21,6 +21,7 @@ SyncLocalCmd *createMsg() { pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; pMsg->sdNewTerm = 123; + pMsg->fcIndex = 456; pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; return pMsg;