From ded461607443e8c1edbaa7aae80ed0310939e124 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 4 Jul 2022 14:55:26 +0800 Subject: [PATCH] refactor(sync): add sync strategy --- include/libs/sync/sync.h | 24 ++++++++++++------- source/dnode/mnode/impl/src/mndMain.c | 18 +++++++------- source/dnode/mnode/impl/src/mndSync.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 3 ++- source/libs/sync/src/syncMain.c | 2 +- .../test/syncConfigChangeSnapshotTest.cpp | 2 +- source/libs/sync/test/syncTestTool.cpp | 6 ++--- 7 files changed, 32 insertions(+), 25 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index d4df039394..6e36e03b92 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -31,6 +31,12 @@ extern bool gRaftDetailLog; #define SYNC_INDEX_INVALID -1 #define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF +typedef enum { + SYNC_STRATEGY_NO_SNAPSHOT = 0, + SYNC_STRATEGY_STANDARD_SNAPSHOT = 1, + SYNC_STRATEGY_WAL_FIRST = 2, +} ESyncStrategy; + typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; typedef int64_t SyncIndex; @@ -183,15 +189,15 @@ typedef struct SSyncLogStore { } SSyncLogStore; typedef struct SSyncInfo { - bool isStandBy; - bool snapshotEnable; - SyncGroupId vgId; - int32_t batchSize; - SSyncCfg syncCfg; - char path[TSDB_FILENAME_LEN]; - SWal* pWal; - SSyncFSM* pFsm; - SMsgCb* msgcb; + bool isStandBy; + ESyncStrategy snapshotStrategy; + SyncGroupId vgId; + int32_t batchSize; + SSyncCfg syncCfg; + char path[TSDB_FILENAME_LEN]; + SWal* pWal; + SSyncFSM* pFsm; + SMsgCb* msgcb; int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); } SSyncInfo; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index c39c9847a9..a3b9bf57fd 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -58,7 +58,7 @@ static void *mndBuildTimerMsg(int32_t *pContLen) { static void mndPullupTrans(SMnode *pMnode) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -67,14 +67,14 @@ static void mndPullupTrans(SMnode *pMnode) { static void mndTtlTimer(SMnode *pMnode) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } static void mndCalMqRebalance(SMnode *pMnode) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -83,7 +83,7 @@ static void mndCalMqRebalance(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); @@ -395,7 +395,7 @@ void mndStop(SMnode *pMnode) { } int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { - SMnode * pMnode = pMsg->info.node; + SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; int32_t code = 0; @@ -413,7 +413,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } do { - char * syncNodeStr = sync2SimpleStr(pMgmt->sync); + char *syncNodeStr = sync2SimpleStr(pMgmt->sync); static int64_t mndTick = 0; if (++mndTick % 10 == 1) { mTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); @@ -579,7 +579,7 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { } int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { - SMnode * pMnode = pMsg->info.node; + SMnode *pMnode = pMsg->info.node; const STraceId *trace = &pMsg->info.traceId; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; @@ -632,7 +632,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) { if (mndAcquireRpcRef(pMnode) != 0) return -1; - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; int64_t ms = taosGetTimestampMs(); pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc)); @@ -713,7 +713,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries; tstrncpy(desc.status, "unsynced", sizeof(desc.status)); for (int32_t i = 0; i < pVgroup->replica; ++i) { - SVnodeGid * pVgid = &pVgroup->vnodeGid[i]; + SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SMonVnodeDesc *pVnDesc = &desc.vnodes[i]; pVnDesc->dnode_id = pVgid->dnodeId; tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role)); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 971cecf63a..2053f3886c 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -178,7 +178,7 @@ int32_t mndInitSync(SMnode *pMnode) { syncInfo.pWal = pMnode->pWal; syncInfo.pFsm = mndSyncMakeFsm(pMnode); syncInfo.isStandBy = pMgmt->standby; - syncInfo.snapshotEnable = true; + syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT; mInfo("start to open mnode sync, standby:%d", pMgmt->standby); if (pMgmt->standby || pMgmt->replica.id > 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 8af7e8c9b3..201f4a4180 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -442,7 +442,8 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { SSyncInfo syncInfo = { - .snapshotEnable = false, + .snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT, + .batchSize = 10, .vgId = pVnode->config.vgId, .isStandBy = pVnode->config.standby, .syncCfg = pVnode->config.syncCfg, diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ddda60bc18..56e459751a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -815,7 +815,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // create a new raft config file SRaftCfgMeta meta; meta.isStandBy = pSyncInfo->isStandBy; - meta.snapshotEnable = pSyncInfo->snapshotEnable; + meta.snapshotEnable = pSyncInfo->snapshotStrategy; meta.lastConfigIndex = SYNC_INDEX_INVALID; ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); ASSERT(ret == 0); diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 01f321cbd4..968baff952 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -198,7 +198,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; syncInfo.isStandBy = isStandBy; - syncInfo.snapshotEnable = true; + syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT; SSyncCfg* pCfg = &syncInfo.syncCfg; diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 505260e978..c6d3a3e4af 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -203,7 +203,7 @@ SWal* createWal(char* path, int32_t vgId) { } int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy, - bool enableSnapshot) { + ESyncStrategy enableSnapshot) { SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; @@ -213,7 +213,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; syncInfo.isStandBy = isStandBy; - syncInfo.snapshotEnable = enableSnapshot; + syncInfo.snapshotStrategy = enableSnapshot; SSyncCfg* pCfg = &syncInfo.syncCfg; @@ -316,7 +316,7 @@ int main(int argc, char** argv) { int32_t replicaNum = atoi(argv[1]); int32_t myIndex = atoi(argv[2]); - bool enableSnapshot = atoi(argv[3]); + ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]); int32_t lastApplyIndex = atoi(argv[4]); int32_t lastApplyTerm = atoi(argv[5]); int32_t writeRecordNum = atoi(argv[6]); -- GitLab