未验证 提交 bb052b81 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18431 from taosdata/fix/sync

fix(sync): if msg commit, put it into apply-queue, do not care return code
......@@ -298,35 +298,24 @@ static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot)
static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data;
if (pMeta->code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
rpcMsg.info = pMsg->info;
rpcMsg.info.conn.applyIndex = pMeta->index;
rpcMsg.info.conn.applyTerm = pMeta->term;
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
rpcMsg.info = pMsg->info;
rpcMsg.info.conn.applyIndex = pMeta->index;
rpcMsg.info.conn.applyTerm = pMeta->term;
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", weak:%d, code:%d, state:%d %s, type:%s",
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak,
pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", weak:%d, code:%d, state:%d %s, type:%s",
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, pMeta->code,
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info};
vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", pVnode->config.vgId,
TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
}
static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
if (pMeta->isWeak == 0) {
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
}
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
}
static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
......@@ -420,7 +409,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become follower", pVnode->config.vgId);
vInfo("vgId:%d, become follower", pVnode->config.vgId);
taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) {
......
......@@ -2509,6 +2509,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
SRpcMsg rpcMsg = {0};
syncEntry2OriginalRpc(pEntry, &rpcMsg);
sTrace("do commit index:%" PRId64 ", type:%s", i, TMSG_INFO(pEntry->msgType));
// user commit
if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
bool internalExecute = true;
......@@ -2516,7 +2518,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
internalExecute = false;
}
sNTrace(ths, "commit index:%" PRId64 ", internal:%d", i, internalExecute);
sNTrace(ths, "user commit index:%" PRId64 ", internal:%d, type:%s", i, internalExecute,
TMSG_INFO(pEntry->msgType));
// execute fsm in apply thread, or execute outside syncPropose
if (internalExecute) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册