提交 0432a8ec 编写于 作者: S Shengliang Guan

refact query queue

上级 220fdfab
...@@ -22,30 +22,29 @@ ...@@ -22,30 +22,29 @@
extern "C" { extern "C" {
#endif #endif
struct SRpcMsg; typedef struct SRpcMsg SRpcMsg;
struct SEpSet; typedef struct SEpSet SEpSet;
struct SMgmtWrapper;
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SMgmtWrapper SMgmtWrapper;
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp);
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EMsgQueueType; typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType;
typedef struct { typedef struct {
struct SMgmtWrapper* pWrapper; SMgmtWrapper* pWrapper;
PutToQueueFp queueFps[QUEUE_MAX]; PutToQueueFp queueFps[QUEUE_MAX];
SendReqFp sendReqFp; SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp; SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp; SendRspFp sendRspFp;
} SMsgCb; } SMsgCb;
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq); int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp); void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -70,10 +70,9 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); ...@@ -70,10 +70,9 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad);
* *
* @param pQnode The qnode object. * @param pQnode The qnode object.
* @param pMsg The request message * @param pMsg The request message
* @param pRsp The response message
* @return int32_t 0 for success, -1 for failure
*/ */
int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg);
int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
#include "tlog.h" #include "tlog.h"
#include "tmsg.h" #include "tmsg.h"
enum { typedef enum {
JOB_TASK_STATUS_NULL = 0, JOB_TASK_STATUS_NULL = 0,
JOB_TASK_STATUS_NOT_START = 1, JOB_TASK_STATUS_NOT_START = 1,
JOB_TASK_STATUS_EXECUTING, JOB_TASK_STATUS_EXECUTING,
...@@ -35,12 +35,12 @@ enum { ...@@ -35,12 +35,12 @@ enum {
JOB_TASK_STATUS_CANCELLING, JOB_TASK_STATUS_CANCELLING,
JOB_TASK_STATUS_CANCELLED, JOB_TASK_STATUS_CANCELLED,
JOB_TASK_STATUS_DROPPING, JOB_TASK_STATUS_DROPPING,
}; } EJobTaskType;
enum { typedef enum {
TASK_TYPE_PERSISTENT = 1, TASK_TYPE_PERSISTENT = 1,
TASK_TYPE_TEMP, TASK_TYPE_TEMP,
}; } ETaskType;
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
......
...@@ -16,16 +16,16 @@ ...@@ -16,16 +16,16 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tmsgcb.h" #include "tmsgcb.h"
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq) { int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
} }
int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq) { int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) {
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
} }
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq) { int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) {
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
} }
void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }
...@@ -42,31 +42,6 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -42,31 +42,6 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode read worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode write worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode sync worker since %s", terrstr());
return -1;
}
return 0;
}
void mmStopWorker(SMnodeMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->readWorker);
dndCleanupWorker(&pMgmt->writeWorker);
dndCleanupWorker(&pMgmt->syncWorker);
}
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) { static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg); return dndWriteMsgToWorker(pWorker, pMsg);
...@@ -90,10 +65,10 @@ static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp ...@@ -90,10 +65,10 @@ static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp
return -1; return -1;
} }
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc; pMsg->rpcMsg = *pRpc;
int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); int32_t code = dndWriteMsgToWorker(pWorker, pMsg);
if (code != 0) { if (code != 0) {
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
...@@ -112,3 +87,28 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { ...@@ -112,3 +87,28 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc); return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc);
} }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode read worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode write worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode sync worker since %s", terrstr());
return -1;
}
return 0;
}
void mmStopWorker(SMnodeMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->readWorker);
dndCleanupWorker(&pMgmt->writeWorker);
dndCleanupWorker(&pMgmt->syncWorker);
}
...@@ -42,6 +42,9 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); ...@@ -42,6 +42,9 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// qmWorker.c // qmWorker.c
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t qmStartWorker(SQnodeMgmt *pMgmt); int32_t qmStartWorker(SQnodeMgmt *pMgmt);
void qmStopWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt);
int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
......
...@@ -21,6 +21,8 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndRea ...@@ -21,6 +21,8 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndRea
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
......
...@@ -16,50 +16,87 @@ ...@@ -16,50 +16,87 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "qmInt.h" #include "qmInt.h"
static void qmProcessQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
dTrace("msg:%p, will be processed in qnode queue", pMsg); SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
SRpcMsg *pRsp = NULL; dndSendRsp(pWrapper, &rsp);
SRpcMsg *pRpc = &pMsg->rpcMsg; }
int32_t code = qndProcessMsg(pMgmt->pQnode, pRpc, &pRsp);
static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
if (pRpc->msgType & 1u) { dTrace("msg:%p, will be processed in qnode-query queue", pMsg);
if (pRsp != NULL) { int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg);
pRsp->ahandle = pRpc->ahandle; if (code != 0) {
dndSendRsp(pMgmt->pWrapper, pRsp); qmSendRsp(pMgmt->pWrapper, pMsg, code);
free(pRsp);
} else {
if (code != 0) code = terrno;
SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
dndSendRsp(pMgmt->pWrapper, &rpcRsp);
}
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pRpc->pCont); rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->queryWorker; dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg);
int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg);
if (code != 0) {
qmSendRsp(pMgmt->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); static int32_t qmPutMsgToWorker(SDnodeWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg); return dndWriteMsgToWorker(pWorker, pMsg);
} }
int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); }
SDnodeWorker *pWorker = &pMgmt->fetchWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); }
return dndWriteMsgToWorker(pWorker, pMsg);
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) {
return -1;
}
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc;
int32_t code = dndWriteMsgToWorker(pWorker, pMsg);
if (code != 0) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
return code;
}
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc);
}
int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc);
} }
int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, qmProcessQueue) != 0) { int32_t maxFetchThreads = 4;
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
int32_t maxQueryThreads = minQueryThreads;
if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", minQueryThreads, maxQueryThreads,
qmProcessQueryQueue) != 0) {
dError("failed to start qnode query worker since %s", terrstr()); dError("failed to start qnode query worker since %s", terrstr());
return -1; return -1;
} }
if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, qmProcessQueue) != 0) { if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", minFetchThreads, maxFetchThreads,
qmProcessFetchQueue) != 0) {
dError("failed to start qnode fetch worker since %s", terrstr()); dError("failed to start qnode fetch worker since %s", terrstr());
return -1; return -1;
} }
......
...@@ -24,8 +24,6 @@ ...@@ -24,8 +24,6 @@
extern "C" { extern "C" {
#endif #endif
typedef enum { VND_WRITE_QUEUE, VND_QUERY_QUEUE, VND_FETCH_QUEUE, VND_APPLY_QUEUE, VND_SYNC_QUEUE } EVndQueueType;
typedef struct SVnodesMgmt { typedef struct SVnodesMgmt {
SHashObj *hash; SHashObj *hash;
SRWLatch latch; SRWLatch latch;
......
...@@ -131,6 +131,8 @@ static void *vmOpenVnodeFunc(void *param) { ...@@ -131,6 +131,8 @@ static void *vmOpenVnodeFunc(void *param) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
......
...@@ -85,6 +85,8 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -85,6 +85,8 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
......
...@@ -152,7 +152,7 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf ...@@ -152,7 +152,7 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf
} }
} }
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueueType qtype) { static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1; int32_t code = -1;
...@@ -167,20 +167,22 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueue ...@@ -167,20 +167,22 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueue
} }
switch (qtype) { switch (qtype) {
case VND_QUERY_QUEUE: case QUERY_QUEUE:
dTrace("msg:%p, will be written into vnode-query queue", pMsg); dTrace("msg:%p, will be written into vnode-query queue", pMsg);
code = taosWriteQitem(pVnode->pQueryQ, pMsg); code = taosWriteQitem(pVnode->pQueryQ, pMsg);
break; break;
case VND_FETCH_QUEUE: case FETCH_QUEUE:
dTrace("msg:%p, will be written into vnode-fetch queue", pMsg); dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
code = taosWriteQitem(pVnode->pFetchQ, pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg);
break; break;
case VND_WRITE_QUEUE: case WRITE_QUEUE:
dTrace("msg:%p, will be written into vnode-write queue", pMsg); dTrace("msg:%p, will be written into vnode-write queue", pMsg);
code = taosWriteQitem(pVnode->pWriteQ, pMsg); code = taosWriteQitem(pVnode->pWriteQ, pMsg);
case VND_SYNC_QUEUE: break;
case SYNC_QUEUE:
dTrace("msg:%p, will be written into vnode-sync queue", pMsg); dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
code = taosWriteQitem(pVnode->pSyncQ, pMsg); code = taosWriteQitem(pVnode->pSyncQ, pMsg);
break;
default: default:
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
break; break;
...@@ -191,19 +193,19 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueue ...@@ -191,19 +193,19 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueue
} }
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_SYNC_QUEUE); return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
} }
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_WRITE_QUEUE); return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
} }
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_QUERY_QUEUE); return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
} }
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_FETCH_QUEUE); return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
} }
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
...@@ -212,7 +214,7 @@ int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -212,7 +214,7 @@ int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return dndWriteMsgToWorker(pWorker, pMsg); return dndWriteMsgToWorker(pWorker, pMsg);
} }
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQueueType qtype) { static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
int32_t code = -1; int32_t code = -1;
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
...@@ -225,20 +227,18 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQue ...@@ -225,20 +227,18 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQue
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc; pMsg->rpcMsg = *pRpc;
switch (qtype) { switch (qtype) {
case VND_QUERY_QUEUE: case QUERY_QUEUE:
dTrace("msg:%p, will be put into vnode-query queue", pMsg); dTrace("msg:%p, will be put into vnode-query queue", pMsg);
code = taosWriteQitem(pVnode->pQueryQ, pMsg); code = taosWriteQitem(pVnode->pQueryQ, pMsg);
break; break;
case VND_FETCH_QUEUE: case FETCH_QUEUE:
dTrace("msg:%p, will be put into vnode-fetch queue", pMsg); dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
code = taosWriteQitem(pVnode->pFetchQ, pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg);
break; break;
case VND_APPLY_QUEUE: case APPLY_QUEUE:
dTrace("msg:%p, will be put into vnode-apply queue", pMsg); dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
code = taosWriteQitem(pVnode->pApplyQ, pMsg); code = taosWriteQitem(pVnode->pApplyQ, pMsg);
break; break;
case VND_WRITE_QUEUE:
case VND_SYNC_QUEUE:
default: default:
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
break; break;
...@@ -249,15 +249,15 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQue ...@@ -249,15 +249,15 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQue
} }
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_QUERY_QUEUE); return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
} }
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_FETCH_QUEUE); return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE);
} }
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_APPLY_QUEUE); return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
} }
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "tlog.h" #include "tlog.h"
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h" #include "trpc.h"
#include "tmsgcb.h"
#include "qnode.h" #include "qnode.h"
......
...@@ -34,25 +34,20 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { ...@@ -34,25 +34,20 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
return pQnode; return pQnode;
} }
void qndClose(SQnode *pQnode) { void qndClose(SQnode *pQnode) {
qWorkerDestroy((void **)&pQnode->pQuery); qWorkerDestroy((void **)&pQnode->pQuery);
free(pQnode); free(pQnode);
} }
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
*pRsp = NULL;
return 0;
}
int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
qTrace("message in query queue is processing"); qTrace("message in query queue is processing");
SReadHandle handle = {0}; SReadHandle handle = {0};
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY:{ case TDMT_VND_QUERY: {
return qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg); return qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg);
} }
case TDMT_VND_QUERY_CONTINUE: case TDMT_VND_QUERY_CONTINUE:
...@@ -63,7 +58,7 @@ int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { ...@@ -63,7 +58,7 @@ int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
} }
} }
int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) {
qTrace("message in fetch queue is processing"); qTrace("message in fetch queue is processing");
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
...@@ -81,17 +76,13 @@ int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { ...@@ -81,17 +76,13 @@ int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) {
case TDMT_VND_SHOW_TABLES: case TDMT_VND_SHOW_TABLES:
return qWorkerProcessShowMsg(pQnode, pQnode->pQuery, pMsg); return qWorkerProcessShowMsg(pQnode, pQnode->pQuery, pMsg);
case TDMT_VND_SHOW_TABLES_FETCH: case TDMT_VND_SHOW_TABLES_FETCH:
//return vnodeGetTableList(pQnode, pMsg); // return vnodeGetTableList(pQnode, pMsg);
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
//return vnodeGetTableMeta(pQnode, pMsg); // return vnodeGetTableMeta(pQnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
//return tqProcessConsumeReq(pQnode->pTq, pMsg); // return tqProcessConsumeReq(pQnode->pTq, pMsg);
default: default:
qError("unknown msg type:%d in fetch queue", pMsg->msgType); qError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册