diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 14823be0982be118d51163d52e5ec961fcdb7733..b4a3d30f7d54a2536f3f2141f0bfdbb1e9d15bea 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -225,7 +225,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { sError("sync begin snapshot error"); return -1; } - + int32_t code = 0; if (syncNodeIsMnode(pSyncNode)) { @@ -390,7 +390,7 @@ bool syncIsReadyForRead(int64_t rid) { if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { SSyncRaftEntry* pEntry = NULL; int32_t code = pSyncNode->pLogStore->syncLogGetEntry( - pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); + pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); if (code == 0 && pEntry != NULL) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { ready = true; @@ -2462,11 +2462,36 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd } else { syncEntryDestory(pEntry); } + return -1; } else { // del resp mgr, call FpCommitCb - ASSERT(0); + + SRpcMsg rpcMsg = {0}; + syncClientRequest2RpcMsg(pMsg, &rpcMsg); + + SFsmCbMeta cbMeta = { + .index = pEntry->index, + .lastConfigIndex = SYNC_INDEX_INVALID, + .isWeak = pEntry->isWeak, + .code = -1, + .state = ths->state, + .seqNum = pEntry->seqNum, + .term = pEntry->term, + .currentTerm = ths->pRaftStore->currentTerm, + .flag = 0, + }; + + syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta); + + if (h) { + taosLRUCacheRelease(ths->pLogStore->pCache, h, false); + } else { + syncEntryDestory(pEntry); + } + return -1; } }