diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 38cb534d7f3e7f8343893db38b286b2da2eabdae..4324c412f7755a7414399f50f5606f7a36b8f328 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -298,35 +298,24 @@ static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; - if (pMeta->code == 0) { - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); - rpcMsg.info = pMsg->info; - rpcMsg.info.conn.applyIndex = pMeta->index; - rpcMsg.info.conn.applyTerm = pMeta->term; + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); + rpcMsg.info = pMsg->info; + rpcMsg.info.conn.applyIndex = pMeta->index; + rpcMsg.info.conn.applyTerm = pMeta->term; - const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 - ", weak:%d, code:%d, state:%d %s, type:%s", - pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, - pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 + ", weak:%d, code:%d, state:%d %s, type:%s", + pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, pMeta->code, + pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); - tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); - } else { - SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info}; - vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", pVnode->config.vgId, - TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code)); - if (rsp.info.handle != NULL) { - tmsgSendRsp(&rsp); - } - } + tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { - if (pMeta->isWeak == 0) { - vnodeSyncApplyMsg(pFsm, pMsg, pMeta); - } + vnodeSyncApplyMsg(pFsm, pMsg, pMeta); } static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { @@ -420,7 +409,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm) { static void vnodeBecomeFollower(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; - vDebug("vgId:%d, become follower", pVnode->config.vgId); + vInfo("vgId:%d, become follower", pVnode->config.vgId); taosThreadMutexLock(&pVnode->lock); if (pVnode->blocked) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7dab496a5b24a87395404368594a64f5362eff2e..2aaa13f95d529447bdaf3542c1eae8b2b91d9e7b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2509,6 +2509,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde SRpcMsg rpcMsg = {0}; syncEntry2OriginalRpc(pEntry, &rpcMsg); + sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType)); + // user commit if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) { bool internalExecute = true; @@ -2516,7 +2518,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde internalExecute = false; } - sNTrace(ths, "commit index:%" PRId64 ", internal:%d", i, internalExecute); + sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute, + TMSG_INFO(pEntry->msgType)); // execute fsm in apply thread, or execute outside syncPropose if (internalExecute) {