From a4d6d84e4c40d899d5e0bf2a4b86967d6d815be8 Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 18 Jul 2023 15:28:59 +0800 Subject: [PATCH] upperindex --- source/libs/sync/src/syncMain.c | 2 ++ source/libs/sync/src/syncPipeline.c | 36 +++++++++++------------------ 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d9b5f6f2a7..5c52ab1e7e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -514,6 +514,7 @@ SSyncState syncGetState(int64_t rid) { } else { state.canRead = state.restored; } + /* double progress = 0; if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){ progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex; @@ -526,6 +527,7 @@ SSyncState syncGetState(int64_t rid) { "progress:%lf, progress:%d", pSyncNode->vgId, pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress); + */ syncNodeRelease(pSyncNode); } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 55e098fd73..9ab00cb4af 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -484,8 +484,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p taosMsleep(1); goto _out; } - ASSERT(pEntry->index == pBuf->matchIndex); - + if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){ if(pNode->pLogBuf->commitIndex == pEntry->index -1){ sInfo("vgId:%d, to change config at %s. " @@ -518,6 +517,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p // replicate on demand (void)syncNodeReplicateWithoutLock(pNode); + ASSERT(pEntry->index == pBuf->matchIndex); + // update my match index matchIndex = pBuf->matchIndex; syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex); @@ -535,11 +536,12 @@ _out: int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, int32_t applyCode, bool force) { - if (pNode->replicaNum == 1 && - pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && - pNode->restoreFinish && pNode->vgId != 1 /*&& pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE*/ + //learner need to execute fsm when it catch up entry log + //if force is true, keep all contition check to execute fsm + if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 + && pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && force == false) { - sDebug("vgId:%d, not to execute, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x, replicaNum:%d," + sDebug("vgId:%d, not to execute fsm, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x, replicaNum:%d," "role:%d, restoreFinish:%d", pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, pNode->replicaNum, pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish); @@ -653,17 +655,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm goto _out; } - - //TODO cdm tmp code - if(index + 1 > upperIndex){ - sInfo("vgId:%d, exeed upperIndex. index + 1:%" PRId64 ", term:%" PRId64 - ", role:%d, current term:%" PRId64 ", upperIndex:%" PRId64 ", commitIndex:%" PRId64 - "matchIndex:%" PRId64, - vgId, pNextEntry->index, pNextEntry->term, role, currentTerm, upperIndex, - commitIndex, pBuf->matchIndex); - } - - //for 2->1, need to apply config change entry in sync thead + //for 2->1, need to apply config change entry in sync thread, if(pNode->replicaNum == 1){ if (syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true) != 0) { sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 @@ -684,13 +676,13 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pNextEntry = NULL; } } + + if (!inBuf) { + syncEntryDestroy(pEntry); + pEntry = NULL; + } } - if (!inBuf) { - syncEntryDestroy(pEntry); - pEntry = NULL; - } - // recycle SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION; for (SyncIndex index = pBuf->startIndex; index < until; index++) { -- GitLab