提交 13aa5940 编写于 作者: M Minghao Li

sync refactor

上级 9db88045
...@@ -731,19 +731,42 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ...@@ -731,19 +731,42 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
int32_t ret = 0; int32_t ret = 0;
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
SyncTerm term = ths->pRaftStore->currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
assert(pEntry != NULL);
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ths->pLogStore->appendEntry(ths->pLogStore, pEntry); ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
// ths->pFsm->FpPreCommitCb(0) // only myself, maybe commit
syncNodeMaybeAdvanceCommitIndex(ths);
// start replicate right now! // start replicate right now!
syncNodeReplicate(ths); syncNodeReplicate(ths);
syncEntryDestory(pEntry); // pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
assert(ths->pFsm != NULL);
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
rpcFreeCont(rpcMsg.pCont);
} else { } else {
// ths->pFsm->FpCommitCb(-1) // pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
assert(ths->pFsm != NULL);
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1);
}
rpcFreeCont(rpcMsg.pCont);
} }
syncEntryDestory(pEntry);
return ret; return ret;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册