diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0fe6993f1d2e35da46bd5a7d97e77303fcd08ea3..c716a17df8839ca9b6202271b567c0f9f0fa1c14 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -731,19 +731,42 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg int32_t ret = 0; syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); + SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); + assert(pEntry != NULL); + if (ths->state == TAOS_SYNC_STATE_LEADER) { - SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); ths->pLogStore->appendEntry(ths->pLogStore, pEntry); - // ths->pFsm->FpPreCommitCb(0) + // only myself, maybe commit + syncNodeMaybeAdvanceCommitIndex(ths); // start replicate right now! syncNodeReplicate(ths); - syncEntryDestory(pEntry); + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + assert(ths->pFsm != NULL); + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + } + rpcFreeCont(rpcMsg.pCont); + } else { - // ths->pFsm->FpCommitCb(-1) + // pre commit + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + assert(ths->pFsm != NULL); + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1); + } + rpcFreeCont(rpcMsg.pCont); } + syncEntryDestory(pEntry); return ret; }