diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 413cb957d2c12d5a8ae11c063d6ebc592b084ae9..5ce4b16300f340d43d2b0e0423d4efa7821755e0 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -22,30 +22,29 @@ extern "C" { #endif -struct SRpcMsg; -struct SEpSet; -struct SMgmtWrapper; +typedef struct SRpcMsg SRpcMsg; +typedef struct SEpSet SEpSet; typedef struct SMgmtWrapper SMgmtWrapper; -typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); -typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq); -typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); -typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp); +typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); +typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); +typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); +typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); -typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EMsgQueueType; +typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; typedef struct { - struct SMgmtWrapper* pWrapper; - PutToQueueFp queueFps[QUEUE_MAX]; - SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; - SendRspFp sendRspFp; + SMgmtWrapper* pWrapper; + PutToQueueFp queueFps[QUEUE_MAX]; + SendReqFp sendReqFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; } SMsgCb; -int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq); -int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq); -int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq); -void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp); +int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); +int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); +int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); +void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); #ifdef __cplusplus } diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 67ee509622de950b8ede9e134cca9163f87b7d22..89553f978beaacd7f54cdce5ab786d510ecd9fc7 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -70,10 +70,9 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); * * @param pQnode The qnode object. * @param pMsg The request message - * @param pRsp The response message - * @return int32_t 0 for success, -1 for failure */ -int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg); +int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index b6f42eeee0ed23a98d595e03c786d9103feb6628..07a5e261a0cd2b997455f4f74f98de09d73f10a2 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -25,7 +25,7 @@ extern "C" { #include "tlog.h" #include "tmsg.h" -enum { +typedef enum { JOB_TASK_STATUS_NULL = 0, JOB_TASK_STATUS_NOT_START = 1, JOB_TASK_STATUS_EXECUTING, @@ -35,12 +35,12 @@ enum { JOB_TASK_STATUS_CANCELLING, JOB_TASK_STATUS_CANCELLED, JOB_TASK_STATUS_DROPPING, -}; +} EJobTaskType; -enum { +typedef enum { TASK_TYPE_PERSISTENT = 1, TASK_TYPE_TEMP, -}; +} ETaskType; typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 19973770e4432f84001e19f11640435c6accab6b..b13f727db6dba5b593acd6bb87d739ffcd6987b3 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -16,16 +16,16 @@ #define _DEFAULT_SOURCE #include "tmsgcb.h" -int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq) { +int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); } -int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq) { +int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) { return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); } -int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq) { +int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); } -void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } +void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 4fa2f60b093bf882e75c95d75bcc47bce2234652..ab9f46e323679ee25f0dbdf758a17a3288417c27 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -42,31 +42,6 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) { - dError("failed to start mnode read worker since %s", terrstr()); - return -1; - } - - if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) { - dError("failed to start mnode write worker since %s", terrstr()); - return -1; - } - - if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) { - dError("failed to start mnode sync worker since %s", terrstr()); - return -1; - } - - return 0; -} - -void mmStopWorker(SMnodeMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->readWorker); - dndCleanupWorker(&pMgmt->writeWorker); - dndCleanupWorker(&pMgmt->syncWorker); -} - static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); return dndWriteMsgToWorker(pWorker, pMsg); @@ -90,10 +65,10 @@ static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp return -1; } - dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; - int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); + int32_t code = dndWriteMsgToWorker(pWorker, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); @@ -112,3 +87,28 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc); } + +int32_t mmStartWorker(SMnodeMgmt *pMgmt) { + if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) { + dError("failed to start mnode read worker since %s", terrstr()); + return -1; + } + + if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) { + dError("failed to start mnode write worker since %s", terrstr()); + return -1; + } + + if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) { + dError("failed to start mnode sync worker since %s", terrstr()); + return -1; + } + + return 0; +} + +void mmStopWorker(SMnodeMgmt *pMgmt) { + dndCleanupWorker(&pMgmt->readWorker); + dndCleanupWorker(&pMgmt->writeWorker); + dndCleanupWorker(&pMgmt->syncWorker); +} diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index 2629486d6034127f2a8689b0de29e52871377419..dc7f10b939a01b78df59d15eeca37fd77b41d1b1 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -42,6 +42,9 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); // qmWorker.c +int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); + int32_t qmStartWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt); int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index 38defd1dc8ff55612fb4c03c8b4f38ec12d11b1d..40959192c3de9ad364ff712c61ebcea047a869e8 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -21,6 +21,8 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndRea static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { SMsgCb msgCb = {0}; msgCb.pWrapper = pMgmt->pWrapper; + msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue; + msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 6285a4bb74849491279ff4f623ef84411267e0de..1a140c69e7c2899302cc65cbef61c6a4384013e2 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -16,50 +16,87 @@ #define _DEFAULT_SOURCE #include "qmInt.h" -static void qmProcessQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { - dTrace("msg:%p, will be processed in qnode queue", pMsg); - SRpcMsg *pRsp = NULL; - SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = qndProcessMsg(pMgmt->pQnode, pRpc, &pRsp); - - if (pRpc->msgType & 1u) { - if (pRsp != NULL) { - pRsp->ahandle = pRpc->ahandle; - dndSendRsp(pMgmt->pWrapper, pRsp); - free(pRsp); - } else { - if (code != 0) code = terrno; - SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code}; - dndSendRsp(pMgmt->pWrapper, &rpcRsp); - } +static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { + SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; + dndSendRsp(pWrapper, &rsp); +} + +static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { + dTrace("msg:%p, will be processed in qnode-query queue", pMsg); + int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg); + if (code != 0) { + qmSendRsp(pMgmt->pWrapper, pMsg, code); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pRpc->pCont); + rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); } -int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->queryWorker; +static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { + dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg); + int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg); + if (code != 0) { + qmSendRsp(pMgmt->pWrapper, pMsg, code); + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); +} - dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); +static int32_t qmPutMsgToWorker(SDnodeWorker *pWorker, SNodeMsg *pMsg) { + dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); return dndWriteMsgToWorker(pWorker, pMsg); } -int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->fetchWorker; +int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); } - dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); +int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); } + +static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) { + SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); + if (pMsg == NULL) { + return -1; + } + + dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); + pMsg->rpcMsg = *pRpc; + + int32_t code = dndWriteMsgToWorker(pWorker, pMsg); + if (code != 0) { + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pRpc->pCont); + } + + return code; +} + +int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + return qmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc); +} + +int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc); } int32_t qmStartWorker(SQnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, qmProcessQueue) != 0) { + int32_t maxFetchThreads = 4; + int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores); + int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1); + int32_t maxQueryThreads = minQueryThreads; + + if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", minQueryThreads, maxQueryThreads, + qmProcessQueryQueue) != 0) { dError("failed to start qnode query worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, qmProcessQueue) != 0) { + if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", minFetchThreads, maxFetchThreads, + qmProcessFetchQueue) != 0) { dError("failed to start qnode fetch worker since %s", terrstr()); return -1; } diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index 2020a3d219292de480da9bd77af9518fbaa942d9..d670e073dacffbc20fcdd413ababc1d5be51530a 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -24,8 +24,6 @@ extern "C" { #endif -typedef enum { VND_WRITE_QUEUE, VND_QUERY_QUEUE, VND_FETCH_QUEUE, VND_APPLY_QUEUE, VND_SYNC_QUEUE } EVndQueueType; - typedef struct SVnodesMgmt { SHashObj *hash; SRWLatch latch; diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 12f38d71ca28ab1b3cd59fbb336007e3fbf2a7da..1c6c7d089e8a17356e333c71b096e173873d7247 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -131,6 +131,8 @@ static void *vmOpenVnodeFunc(void *param) { SMsgCb msgCb = {0}; msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; + msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; + msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index efae47b195e18fb03be03f12c6fde6fcf0b7f6d2..375a83421aa4f81dd66663737e7319217420a573 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -85,6 +85,8 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SMsgCb msgCb = {0}; msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; + msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; + msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index fe01b19d2d25fb396a28af8a5e10eaa1c0e02d66..b0fe8d55e182118d58d8507a553da76946108353 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -152,7 +152,7 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf } } -static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueueType qtype) { +static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; @@ -167,20 +167,22 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueue } switch (qtype) { - case VND_QUERY_QUEUE: + case QUERY_QUEUE: dTrace("msg:%p, will be written into vnode-query queue", pMsg); code = taosWriteQitem(pVnode->pQueryQ, pMsg); break; - case VND_FETCH_QUEUE: + case FETCH_QUEUE: dTrace("msg:%p, will be written into vnode-fetch queue", pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg); break; - case VND_WRITE_QUEUE: + case WRITE_QUEUE: dTrace("msg:%p, will be written into vnode-write queue", pMsg); code = taosWriteQitem(pVnode->pWriteQ, pMsg); - case VND_SYNC_QUEUE: + break; + case SYNC_QUEUE: dTrace("msg:%p, will be written into vnode-sync queue", pMsg); code = taosWriteQitem(pVnode->pSyncQ, pMsg); + break; default: terrno = TSDB_CODE_INVALID_PARA; break; @@ -191,19 +193,19 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueue } int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_SYNC_QUEUE); + return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_WRITE_QUEUE); + return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_QUERY_QUEUE); + return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); } int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_FETCH_QUEUE); + return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); } int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { @@ -212,7 +214,7 @@ int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return dndWriteMsgToWorker(pWorker, pMsg); } -static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQueueType qtype) { +static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; int32_t code = -1; SMsgHead *pHead = pRpc->pCont; @@ -225,20 +227,18 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQue dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; switch (qtype) { - case VND_QUERY_QUEUE: + case QUERY_QUEUE: dTrace("msg:%p, will be put into vnode-query queue", pMsg); code = taosWriteQitem(pVnode->pQueryQ, pMsg); break; - case VND_FETCH_QUEUE: + case FETCH_QUEUE: dTrace("msg:%p, will be put into vnode-fetch queue", pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg); break; - case VND_APPLY_QUEUE: + case APPLY_QUEUE: dTrace("msg:%p, will be put into vnode-apply queue", pMsg); code = taosWriteQitem(pVnode->pApplyQ, pMsg); break; - case VND_WRITE_QUEUE: - case VND_SYNC_QUEUE: default: terrno = TSDB_CODE_INVALID_PARA; break; @@ -249,15 +249,15 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQue } int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_QUERY_QUEUE); + return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE); } int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_FETCH_QUEUE); + return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE); } int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_APPLY_QUEUE); + return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE); } int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index 168e538ac0147a5966f9869b217160b5ac52484c..307bf3efb8913d4fae4966797835e432ebb051c5 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -21,6 +21,7 @@ #include "tlog.h" #include "tmsg.h" #include "trpc.h" +#include "tmsgcb.h" #include "qnode.h" diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 37fd4cbed527ec22149b30d6753c93e98f74dfaf..bc1a602b52ed3218d7a912e5c001e63299d08ab4 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -34,25 +34,20 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { return pQnode; } -void qndClose(SQnode *pQnode) { +void qndClose(SQnode *pQnode) { qWorkerDestroy((void **)&pQnode->pQuery); - free(pQnode); + free(pQnode); } int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } -int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - *pRsp = NULL; - return 0; -} - -int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { +int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { qTrace("message in query queue is processing"); SReadHandle handle = {0}; switch (pMsg->msgType) { - case TDMT_VND_QUERY:{ + case TDMT_VND_QUERY: { return qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg); } case TDMT_VND_QUERY_CONTINUE: @@ -63,7 +58,7 @@ int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { } } -int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { +int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { qTrace("message in fetch queue is processing"); switch (pMsg->msgType) { case TDMT_VND_FETCH: @@ -81,17 +76,13 @@ int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { case TDMT_VND_SHOW_TABLES: return qWorkerProcessShowMsg(pQnode, pQnode->pQuery, pMsg); case TDMT_VND_SHOW_TABLES_FETCH: - //return vnodeGetTableList(pQnode, pMsg); + // return vnodeGetTableList(pQnode, pMsg); case TDMT_VND_TABLE_META: - //return vnodeGetTableMeta(pQnode, pMsg); + // return vnodeGetTableMeta(pQnode, pMsg); case TDMT_VND_CONSUME: - //return tqProcessConsumeReq(pQnode->pTq, pMsg); + // return tqProcessConsumeReq(pQnode->pTq, pMsg); default: qError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; } } - - - -