提交 2756c730 编写于 作者: S Shengliang Guan

change the parameters in vread queue

上级 c0aed598
......@@ -26,13 +26,6 @@
#include "dnodeVRead.h"
#include "vnode.h"
typedef struct {
SRspRet rspRet;
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
} SReadMsg;
typedef struct {
pthread_t thread; // thread
int32_t workerId; // worker ID
......@@ -218,7 +211,7 @@ static void *dnodeProcessReadQueue(void *param) {
}
dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
int32_t code = vnodeProcessRead(pVnode, pReadMsg);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
taosFreeQitem(pReadMsg);
}
......
......@@ -34,6 +34,13 @@ typedef struct {
void *qhandle; //used by query and retrieve msg
} SRspRet;
typedef struct {
SRspRet rspRet;
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
} SReadMsg;
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir);
......@@ -52,7 +59,7 @@ void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
void vnodeBuildStatusMsg(void * param);
int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret);
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
#ifdef __cplusplus
}
......
......@@ -27,17 +27,18 @@
#include "vnodeLog.h"
#include "query.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
}
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int msgType = pReadMsg->rpcMsg.msgType;
if (vnodeProcessReadMsgFp[msgType] == NULL) {
vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
......@@ -55,10 +56,14 @@ int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen,
return TSDB_CODE_RPC_NOT_READY;
}
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen;
SRspRet *pRet = &pReadMsg->rspRet;
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
memset(pRet, 0, sizeof(SRspRet));
......@@ -91,7 +96,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
return code;
}
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont;
SRspRet *pRet = &pReadMsg->rspRet;
SRetrieveTableMsg *pRetrieve = pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
memset(pRet, 0, sizeof(SRspRet));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册