From b69a26678c6fa8fee3d104da2fafd857fde3b506 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 12 May 2022 15:23:41 +0800 Subject: [PATCH] enh(sync): raft config change --- include/common/tmsgdef.h | 1 + source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/inc/syncUtil.h | 5 +++ source/libs/sync/src/syncAppendEntries.c | 22 +++++++++--- source/libs/sync/src/syncCommit.c | 13 ++++++- source/libs/sync/src/syncMain.c | 44 ++++++++++++++++++++++-- source/libs/sync/src/syncUtil.c | 28 +++++++++++++++ 7 files changed, 107 insertions(+), 7 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 8e918c40f9..c7deaa7845 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -217,6 +217,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "vnode-sync-unknown", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_VNODE, "vnode-sync-vnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_VNODE, "vnode-alter-vnode", NULL, NULL) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index cb539e8379..8a21eea7b7 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -271,6 +271,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 83f31c5dac..159af1610e 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -57,6 +57,11 @@ SyncIndex syncUtilMinIndex(SyncIndex a, SyncIndex b); SyncIndex syncUtilMaxIndex(SyncIndex a, SyncIndex b); void syncUtilMsgHtoN(void* msg); void syncUtilMsgNtoH(void* msg); +bool syncUtilIsData(tmsg_t msgType); +bool syncUtilUserPreCommit(tmsg_t msgType); +bool syncUtilUserCommit(tmsg_t msgType); +bool syncUtilUserRollback(tmsg_t msgType); + #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 6623ed1caa..aed19d042e 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -19,6 +19,7 @@ #include "syncRaftStore.h" #include "syncUtil.h" #include "syncVoteMgr.h" +#include "syncRaftCfg.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == @@ -199,7 +200,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); - if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { + //if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { + if (syncUtilUserRollback(pRollBackEntry->msgType)) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); @@ -227,7 +229,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; cbMeta.isWeak = pAppendEntry->isWeak; @@ -258,7 +261,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; cbMeta.isWeak = pAppendEntry->isWeak; @@ -320,7 +324,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; cbMeta.isWeak = pEntry->isWeak; @@ -330,6 +335,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); } + // config change + if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg newSyncCfg; + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); + ASSERT(ret == 0); + + syncNodeUpdateConfig(ths, &newSyncCfg); + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 5d6cbd2a58..620f0e9cd2 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -19,6 +19,7 @@ #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" +#include "syncRaftCfg.h" // \* Leader i advances its commitIndex. // \* This is done as a separate step from handling AppendEntries responses, @@ -101,7 +102,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; cbMeta.isWeak = pEntry->isWeak; @@ -111,6 +113,15 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); } + // config change + if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg newSyncCfg; + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); + ASSERT(ret == 0); + + syncNodeUpdateConfig(pSyncNode, &newSyncCfg); + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 911d8384f0..da23d8415b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -116,6 +116,15 @@ void syncStop(int64_t rid) { int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t ret = 0; + char *configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); + SRpcMsg rpcMsg = {0}; + rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE; + rpcMsg.noResp = 1; + rpcMsg.contLen = strlen(configChange) + 1; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange); + taosMemoryFree(configChange); + ret = syncPropose(rid, &rpcMsg, false); return ret; } @@ -849,6 +858,35 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { return s; } +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) { + pSyncNode->pRaftCfg->cfg = *newConfig; + int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); + ASSERT(ret == 0); + + // init internal + pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; + syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); + + // init peersNum, peers, peersId + pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; + int j = 0; + for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + if (i != pSyncNode->pRaftCfg->cfg.myIndex) { + pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i]; + j++; + } + } + for (int i = 0; i < pSyncNode->peersNum; ++i) { + syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); + } + + // init replicaNum, replicasId + pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; + for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); + } +} + SSyncNode* syncNodeAcquire(int64_t rid) { SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid); if (pNode == NULL) { @@ -1207,7 +1245,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; cbMeta.isWeak = pEntry->isWeak; @@ -1228,7 +1267,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + //if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; cbMeta.isWeak = pEntry->isWeak; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 3110c0b2a3..cf045a6926 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -212,4 +212,32 @@ void syncUtilMsgNtoH(void* msg) { SMsgHead* pHead = msg; pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); +} + +bool syncUtilIsData(tmsg_t msgType) { + if (msgType == TDMT_VND_SYNC_NOOP || msgType == TDMT_VND_SYNC_CONFIG_CHANGE) { + return false; + } + return true; +} + +bool syncUtilUserPreCommit(tmsg_t msgType) { + if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + return true; + } + return false; +} + +bool syncUtilUserCommit(tmsg_t msgType) { + if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + return true; + } + return false; +} + +bool syncUtilUserRollback(tmsg_t msgType) { + if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + return true; + } + return false; } \ No newline at end of file -- GitLab