提交 372d26f7 编写于 作者: M Minghao Li

refactor(sync): delete assert, call FpCommitCb when multi replica

上级 ebc17283
......@@ -225,7 +225,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
sError("sync begin snapshot error");
return -1;
}
int32_t code = 0;
if (syncNodeIsMnode(pSyncNode)) {
......@@ -390,7 +390,7 @@ bool syncIsReadyForRead(int64_t rid) {
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
ready = true;
......@@ -2462,11 +2462,36 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
} else {
syncEntryDestory(pEntry);
}
return -1;
} else {
// del resp mgr, call FpCommitCb
ASSERT(0);
SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
SFsmCbMeta cbMeta = {
.index = pEntry->index,
.lastConfigIndex = SYNC_INDEX_INVALID,
.isWeak = pEntry->isWeak,
.code = -1,
.state = ths->state,
.seqNum = pEntry->seqNum,
.term = pEntry->term,
.currentTerm = ths->pRaftStore->currentTerm,
.flag = 0,
};
syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
if (h) {
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
} else {
syncEntryDestory(pEntry);
}
return -1;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册