提交 9a540919 编写于 作者: M Minghao Li

refactor(sync): pre-commit integration

上级 49a4e83d
......@@ -501,49 +501,54 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
}
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0 && cbMeta.isWeak == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), pMsg->msgType,
TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
if (cbMeta.isWeak == 0) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
}
}
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0 && cbMeta.isWeak == 1) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync pre-commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
if (cbMeta.isWeak == 1) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64
", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync pre-commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册