From 4535722957103e94d3bbd595047af4bc92079238 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 15:41:04 +0800 Subject: [PATCH] enh(sync) sync/mnode integration, syncStart async -> sync --- include/libs/sync/sync.h | 2 +- source/libs/sync/inc/syncIO.h | 4 ++-- source/libs/sync/inc/syncInt.h | 5 +++++ source/libs/sync/src/syncAppendEntries.c | 20 ++++++++++++++++- source/libs/sync/src/syncCommit.c | 25 ++++++++++++++++++---- source/libs/sync/src/syncMain.c | 21 +++++++++++++++++- source/libs/sync/test/syncSnapshotTest.cpp | 2 ++ 7 files changed, 70 insertions(+), 9 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 872785abe5..1eed353f33 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -87,6 +87,7 @@ typedef struct SSyncFSM { void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); + void (*FpRestoreFinish)(struct SSyncFSM* pFsm); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); } SSyncFSM; @@ -119,7 +120,6 @@ typedef struct SSyncLogStore { } SSyncLogStore; - typedef struct SSyncInfo { SyncGroupId vgId; SSyncCfg syncCfg; diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index f65a317694..b69c087b5f 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -36,8 +36,8 @@ typedef struct SSyncIO { STaosQueue *pMsgQ; STaosQset * pQset; TdThread consumerTid; - void *serverRpc; - void *clientRpc; + void * serverRpc; + void * clientRpc; SEpSet myAddr; SMsgCb msgcb; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 768e1c1cf1..d3345620e9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -147,6 +147,11 @@ typedef struct SSyncNode { // tools SSyncRespMgr* pSyncRespMgr; + // restore state + bool restoreFinish; + sem_t restoreSem; + SSnapshot* pSnapshot; + } SSyncNode; // open/close -------------- diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index bf1eeb4186..18735f5b71 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - // if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -335,6 +334,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.term = pEntry->term; cbMeta.currentTerm = ths->pRaftStore->currentTerm; ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + + bool needExecute = true; + if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) { + needExecute = false; + } + + if (needExecute) { + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + } } // config change @@ -351,6 +359,16 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } } + // restore finish + if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { + if (ths->restoreFinish == false) { + ths->pFsm->FpRestoreFinish(ths->pFsm); + ths->restoreFinish = true; + + tsem_post(&ths->restoreSem); + } + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 01e408e543..557f69a2ce 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - // if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -110,9 +109,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.code = 0; cbMeta.state = pSyncNode->state; cbMeta.seqNum = pEntry->seqNum; - cbMeta.term = pEntry->term; - cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); + cbMeta.term = pEntry->term; + cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; + + bool needExecute = true; + if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) { + needExecute = false; + } + + if (needExecute) { + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); + } } // config change @@ -129,6 +136,16 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } } + // restore finish + if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { + if (pSyncNode->restoreFinish == false) { + pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + pSyncNode->restoreFinish = true; + + tsem_post(&pSyncNode->restoreSem); + } + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f4f09d0f7b..bffebc1693 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -242,7 +242,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { return ret; } -void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) { +void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid); @@ -492,6 +492,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0); assert(pSyncNode->pSyncRespMgr != NULL); + // restore state + pSyncNode->restoreFinish = false; + pSyncNode->pSnapshot = NULL; + if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); + } + tsem_init(&(pSyncNode->restoreSem), 0, 0); + // start in syncNodeStart // start raft // syncNodeBecomeFollower(pSyncNode); @@ -511,6 +520,8 @@ void syncNodeStart(SSyncNode* pSyncNode) { // use this now syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica + + tsem_wait(&pSyncNode->restoreSem); return; } @@ -520,6 +531,8 @@ void syncNodeStart(SSyncNode* pSyncNode) { int32_t ret = 0; // ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + + tsem_wait(&pSyncNode->restoreSem); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -556,6 +569,12 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pFsm); } + if (pSyncNode->pSnapshot != NULL) { + taosMemoryFree(pSyncNode->pSnapshot); + } + + tsem_destroy(&pSyncNode->restoreSem); + // free memory in syncFreeNode // taosMemoryFree(pSyncNode); } diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 62bda5b22e..8ccd698907 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) { } int main(int argc, char **argv) { + sprintf(tsTempDir, "%s", "."); + // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; -- GitLab