提交 952bf4f0 编写于 作者: S Shengliang Guan

process query msg

上级 50229512
......@@ -16,6 +16,11 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
dndSendRsp(pWrapper, &rsp);
}
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
......@@ -44,8 +49,7 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
if (msgType & 1u) {
if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle};
dndSendRsp(pMgmt->pWrapper, &rsp);
vmSendRsp(pMgmt->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
......@@ -54,18 +58,27 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
}
static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in vnode query queue", pMsg);
vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in vnode fetch queue", pMsg);
vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
}
dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
}
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
dndSendRsp(pWrapper, &rsp);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
......@@ -101,8 +114,7 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
free(pRsp);
} else {
if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
dndSendRsp(pVnode->pWrapper, &rpcRsp);
vmSendRsp(pVnode->pWrapper, pMsg, code);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册