From 13aa59409d47b2d39ee316b9a7737e0fdddc333f Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 17 Mar 2022 16:22:51 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/src/syncMain.c | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0fe6993f1d..c716a17df8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -731,19 +731,42 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg int32_t ret = 0; syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); + SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); + assert(pEntry != NULL); + if (ths->state == TAOS_SYNC_STATE_LEADER) { - SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); ths->pLogStore->appendEntry(ths->pLogStore, pEntry); - // ths->pFsm->FpPreCommitCb(0) + // only myself, maybe commit + syncNodeMaybeAdvanceCommitIndex(ths); // start replicate right now! syncNodeReplicate(ths); - syncEntryDestory(pEntry); + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + assert(ths->pFsm != NULL); + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } + rpcFreeCont(rpcMsg.pCont); + } else { - // ths->pFsm->FpCommitCb(-1) + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + assert(ths->pFsm != NULL); + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1); + } + rpcFreeCont(rpcMsg.pCont); } + syncEntryDestory(pEntry); return ret; } -- GitLab