diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 35e79cf45db6acedd4d2fc26e68222af50880fc5..aa563343f8d860ba80ec243a1b594a6545a61ff7 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -129,6 +129,9 @@ typedef struct SSyncFSM { void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); + void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm); + void (*FpBecomeFollowerCb)(struct SSyncFSM* pFsm); + int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index b8ff7be46a8098fc3bc88eab0fcaea6884b507b2..37d5aeb62d73d523031cf5049b642c5a0645ca15 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -166,6 +166,18 @@ void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb mDebug("vgId:1, mnode leader transfer finish"); } +static void mndBecomeFollower(struct SSyncFSM *pFsm) { + SMnode *pMnode = pFsm->data; + mDebug("vgId:1, become follower"); + + // clear old leader resource +} + +static void mndBecomeLeader(struct SSyncFSM *pFsm) { + SMnode *pMnode = pFsm->data; + mDebug("vgId:1, become leader"); +} + SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; @@ -175,6 +187,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpLeaderTransferCb = mndLeaderTransfer; pFsm->FpReConfigCb = mndReConfig; + pFsm->FpBecomeLeaderCb = mndBecomeLeader; + pFsm->FpBecomeFollowerCb = mndBecomeFollower; pFsm->FpGetSnapshot = mndSyncGetSnapshot; pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo; pFsm->FpSnapshotStartRead = mndSnapshotStartRead; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index c525d09057add72d07fdfcb8f7bc703f518752a6..7ac124fdd3e08383afdc3a949b4e03fcaacae83a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -672,6 +672,18 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) { vDebug("vgId:%d, sync restore finished", pVnode->config.vgId); } +static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { + SVnode *pVnode = pFsm->data; + vDebug("vgId:%d, become follower", pVnode->config.vgId); + + // clear old leader resource +} + +static void vnodeBecomeLeader(struct SSyncFSM *pFsm) { + SVnode *pVnode = pFsm->data; + vDebug("vgId:%d, become leader", pVnode->config.vgId); +} + static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pVnode; @@ -681,6 +693,8 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpRestoreFinishCb = vnodeRestoreFinish; pFsm->FpLeaderTransferCb = vnodeLeaderTransfer; + pFsm->FpBecomeLeaderCb = vnodeBecomeLeader; + pFsm->FpBecomeFollowerCb = vnodeBecomeFollower; pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 20b4b23bf78e1f865cfd6b34e27154cb1de23442..5f3ff3015c308e17a724db28d97d299cd05029a6 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2028,6 +2028,11 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // reset elect timer syncNodeResetElectTimer(pSyncNode); + // call back + if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeFollowerCb != NULL) { + pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm); + } + // trace log do { int32_t debugStrLen = strlen(debugStr); @@ -2109,6 +2114,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // start heartbeat timer syncNodeStartHeartbeatTimer(pSyncNode); + // call back + if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) { + pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm); + } + // trace log do { int32_t debugStrLen = strlen(debugStr); @@ -3100,7 +3110,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries syncNodeEventLog(pSyncNode, logBuf); } - void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { +void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);